Azure DataBricks Stream foreach失败,并显示NotSerializableException [英] Azure DataBricks Stream foreach fails with NotSerializableException

查看:130
本文介绍了Azure DataBricks Stream foreach失败,并显示NotSerializableException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想不断完善数据集流的行(最初由Kafka发起):基于条件,我想更新Radis哈希.这是我的代码段(lastContacts是上一条命令的结果,该命令是以下类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long].它将扩展为org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]):

I want to continuously elaborate rows of a dataset stream (originally initiated by a Kafka): based on a condition I want to update a Radis hash. This is my code snippet (lastContacts is the result of a previous command, which is a stream of this type: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]. This expands to org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]):

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

I receive a huge stack trace, which the relevant part (I think) is this: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

任何人都可以解释为什么发生此异常以及如何避免该异常吗?谢谢!

Could anyone explain why this exception occurs and how to avoid? Thank you!

此问题与以下两个问题有关:

This question is related to the following two:

  • DataFrame to RDD[(String, String)] conversion
  • Call a function with each element a stream in Databricks

推荐答案

Spark上下文不可序列化.

Spark Context is not serializable.

ForeachWriter的任何实现都必须可序列化,因为每个任务都将获得所提供对象的新的序列化反序列化副本.因此,强烈建议在调用open(...)方法之后执行任何用于写入数据的初始化操作(例如,打开连接或启动事务),这表明任务已准备好生成数据.

在您的代码中,您尝试在流程方法中使用spark上下文,

In your code, you are trying to use spark context within process method,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

要将数据发送到Redis,您需要创建自己的连接并以open方法打开它,然后在process方法中使用它.

To send data to redis, you need to create your own connection and open it in the open method and then use it in the process method.

看看如何创建Redis连接池. https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

Take a look how to create redis connection pool. https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

这篇关于Azure DataBricks Stream foreach失败,并显示NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆