星火流:NullPointerException异常里面foreachPartition [英] Spark Streaming: NullPointerException inside foreachPartition

查看:2296
本文介绍了星火流:NullPointerException异常里面foreachPartition的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花工作流从卡夫卡读取和写入再次向Postrges之前确实在Postgres的现有表一些比较。这是什么样子:

I have a spark streaming job which reads from Kafka and does some comparisons with an existing table in Postgres before writing to Postrges again. This is what it looks like :

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)
    println("First")

    kafkaDF.foreachPartition(
      i =>{
        val jdbcDF = sqlContext.read.format("jdbc").options(
          Map("url" -> "jdbc:postgresql://...",
            "dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()

        createConnection()
        i.foreach(
          row =>{
            println("Second")
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

这code是给我的NullPointerException在该行 VAL jbdcDF = ...

This code is giving me NullPointerException at the line val jbdcDF = ...

我是什么做错了吗?另外,我的日志第一的作品,但不显示在日志中的任何地方。我试着用 整个code kafkaDF.collect()的foreach(...)和它的作品完美,但性能很差。我期待与 foreachPartition 来代替它。

What am I doing wrong? Also, my log "First" works, but "Second" doesn't show up anywhere in the logs. I tried the entire code with kafkaDF.collect().foreach(...) and it works perfectly, but has very poor performance. I am looking to replace it with foreachPartition.

感谢

推荐答案

如果里面有的createConnection 任何问题,这是不明确的, closeConnection connection.sendToTable ,但根本的问题是企图巢行动/转换。它不支持Spark和星火流不一样。

It is not clear if there are any issues inside createConnection, closeConnection or connection.sendToTable but fundamental problem is an attempt to nest actions / transformations. It is not supported in Spark and Spark Streaming is not different.

这意味着,嵌套数据帧初始化( VAL jdbcDF = sqlContext.read.format ... )根本无法工作,并应被删除。如果你使用它作为参考它应该在同一水平上使用标准的转换被创建为 kafkaDF 和refferenced( unionAll 加入,...)。

It means that nested DataFrame initialization (val jdbcDF = sqlContext.read.format ...) simply cannot work and should be removed. If you use it as a reference it should be created at the same level as kafkaDF and refferenced using standard transformations (unionAll, join, ...).

如果由于某种原因,这不是你可以创建在 forEachPartition 普通JDBC连接和PostgreSQL的表进行操作可接受的解决方案(我猜是你已经什么做在 sendToTable )。

If for some reason it is not an acceptable solution you can create plain JDBC connection inside forEachPartition and operate on PostgreSQL table (I guess it is what you're already do inside sendToTable).

这篇关于星火流:NullPointerException异常里面foreachPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆