任务在Spark中产生了不可序列化的结果 [英] Task had a not serializable result in spark

查看:111
本文介绍了任务在Spark中产生了不可序列化的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用cassandra驱动程序读取cassandra表.这是代码.

I am trying to read cassandra table using the cassandra driver to the spark. Here is the code.

val x = 1 to 2
val rdd = sc.parallelize(x)

val query = "Select data from testkeyspace.testtable where id=%d"

val cc = CassandraConnector(sc.getConf)

val res1 =
    rdd.map{ it => 
            cc.withSessionDo{ session =>
            session.execute( query.format(it))
        }
     }

res1.take(1).foreach(println)

但是我遇到了异常Task的结果无法序列​​化.

but I am getting the exception Task had a not serializable result.

  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 24.0 (TID 77) had a not serializable result: com.datastax.driver.core.ArrayBackedResultSet$SinglePage
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如何解决?

推荐答案

在我们的转换中,不可序列化的对象是从Cassandra返回的结果,可对查询结果进行迭代.通常,您希望将该集合具体化为RDD.

The non-serializable object in our transformation is the result coming back from Cassandra, which is an iterable on the query result. You typically want to materialize that collection into the RDD.

一种方法是询问该查询产生的所有记录:

One way would be to ask all records resulting from that query:

session.execute( query.format(it)).all()

这篇关于任务在Spark中产生了不可序列化的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆