我无法在RDD [ConsumerRecord]上进行转换 [英] I can't do transformation on a RDD[ConsumerRecord]

查看:378
本文介绍了我无法在RDD [ConsumerRecord]上进行转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在从DStream的kafka主题中恢复ConsumerRecord时遇到序列化问题.为了说明我的问题,我创建了以下示例.我创建一个带有json值的ConsumerRecord,然后将其放入rdd中.我进行收集以打印结果.我收到序列化错误.这是示例:

I have a serialization problem with a ConsumerRecord recover from a kafka topic in a DStream. To illustrate my problem, I create the following exemple. I create a ConsumerRecord with a json value, I put it inside a rdd. I do a collect in order to print result. I get a serialization error. here is the Exemple:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object Exemple {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val conf = new SparkConf().setMaster("local[*]").setAppName("Exemple")
    val sc = new SparkContext(conf)

    val consumerRecord:  ConsumerRecord[String, String] = new  ConsumerRecord("",1,2l,"myTopic","""{"id":1}""")
    val rdd = sc.parallelize(List(consumerRecord))
    val valueRDD = rdd.map(cr => cr.value)
    valueRDD.collect.foreach(println) // this line throws error

  }
}

我收到以下错误:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
[Stage 0:>                                                          (0 + 0) / 4]18/03/02 00:58:55 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR TaskSetManager: Failed to serialize task 3, not attempting to retry it.
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1}))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1})))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@694)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 3))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR TaskSchedulerImpl: Resource offer failed, task set TaskSet_0.0 was not serializable
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 3, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1}))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1})))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@694)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 3))
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at Exemple$.main(Exemple.scala:20)
    at Exemple.main(Exemple.scala)

Process finished with exit code 1

你有什么主意吗?

推荐答案

org.apache.kafka.clients.consumer.ConsumerRecord无法序列化,因此有一些方法可以解决此类问题

org.apache.kafka.clients.consumer.ConsumerRecord is not serializable, So there are some way to solve this kind of problems

  1. 使用kryo序列化对类进行序列化
  2. 仅在map中传递的lambda函数中声明实例.
  3. 将NotSerializable对象设为静态对象,并在每台计算机上创建一次.
  4. 调用rdd.forEachPartition并在其中创建NotSerializable对象,如下所示:

  1. Serializable the class using kryo serialization
  2. Declare the instance only within the lambda function passed in map.
  3. Make the NotSerializable object as a static and create it once per machine.
  4. Call rdd.forEachPartition and create the NotSerializable object in there like this:

rdd.forEachPartition(iter-> { NotSerializable notSerializable = new NotSerializable();

rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable();

//...现在处理迭代 });

// ...Now process iter });

有关更多详细信息,您可以检查此链接 https ://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

For more details you can check this link https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

这篇关于我无法在RDD [ConsumerRecord]上进行转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆