Azure DataBricks Stream foreach失败,并显示NotSerializableException [英] Azure DataBricks Stream foreach fails with NotSerializableException
问题描述
我想不断完善数据集流的行(最初由Kafka发起):基于条件,我想更新Radis哈希.这是我的代码段(lastContacts
是上一条命令的结果,该命令是以下类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
.它将扩展为org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
I want to continuously elaborate rows of a dataset stream (originally initiated by a Kafka): based on a condition I want to update a Radis hash. This is my code snippet (lastContacts
is the result of a previous command, which is a stream of this type: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
. This expands to org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
I receive a huge stack trace, which the relevant part (I think) is this: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
任何人都可以解释为什么发生此异常以及如何避免该异常吗?谢谢!
Could anyone explain why this exception occurs and how to avoid? Thank you!
此问题与以下两个问题有关:
This question is related to the following two:
- DataFrame to RDD[(String, String)] conversion
- Call a function with each element a stream in Databricks
推荐答案
Spark上下文不可序列化.
Spark Context is not serializable.
ForeachWriter的任何实现都必须可序列化,因为每个任务都将获得所提供对象的新的序列化反序列化副本.因此,强烈建议在调用open(...)方法之后执行任何用于写入数据的初始化操作(例如,打开连接或启动事务),这表明任务已准备好生成数据.
在您的代码中,您尝试在流程方法中使用spark上下文,
In your code, you are trying to use spark context within process method,
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
*sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
}
要将数据发送到Redis,您需要创建自己的连接并以open方法打开它,然后在process方法中使用它.
To send data to redis, you need to create your own connection and open it in the open method and then use it in the process method.
看看如何创建Redis连接池. https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
Take a look how to create redis connection pool. https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
这篇关于Azure DataBricks Stream foreach失败,并显示NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!