RDD不可序列卡桑德拉/星火连接器的Java API [英] RDD not serializable Cassandra/Spark connector java API
问题描述
所以我previously就如何在这里一个Java Maven项目用火花来查询卡桑德拉一些问题:<一href=\"http://stackoverflow.com/questions/25893945/querying-data-in-cassandra-via-spark-in-a-java-maven-project\">Querying通过在一个Java Maven项目 Spark在卡桑德拉数据
so I previously had some questions on how to query cassandra using spark in a java maven project here: Querying Data in Cassandra via Spark in a Java Maven Project
那么我的问题得到回答,它的工作,但我碰到的一个问题(可能是一个问题)。我想现在使用的datastax的Java API。这里是我的code:
Well my question was answered and it worked, however I've run into an issue (possibly an issue). I'm trying to now use the datastax java API. Here is my code:
package com.angel.testspark.test2;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App
{
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person() { }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getfname() { return fname; }
public void setfname(String fname) { this.fname = fname; }
public String getlname() { return lname; }
public void setlname(String lname) { this.lname = lname; }
public String getrole() { return role; }
public void setrole(String role) { this.role = role; }
// other methods, constructors, etc.
}
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
这是我的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
现在我知道确切位置我的错误是。这是的System.out.println(数据为豆人:\\ n+ StringUtils.join(\\ n,rdd.toArray()));
,因为我需要RDD转换成数组。然而,API文档说我应该能够做到这一点......这是code复制,并从文档粘贴。我为什么不能序列化RDD到一个数组?
Now I KNOW exactly where my error is. It is System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
because I need to convert rdd to an Array. However, the API documentation SAID i should be able to do this... this is code copied and pasted from the documentation. Why can I not serialize the RDD to an array?
我已经插入虚拟数据使用将在我的岗位,我包括在上面的链接插入我的卡珊德拉
I've already inserted dummy data into my cassandra using the insertions in my post that I included in the link above.
另外,我解决了previous错误是,当我改变了我所有的getter和setter为小写。当我在他们使用的首都,它产生一个错误。为什么我不能在我的getter和setter方法用在这里建都?
Also, a previous error that I solved was when i changed all of my getters and setters to lowercase. When I used capitals in them, it produced an error. Why can't I use capitals in my getters and setters here?
谢谢,
天使
推荐答案
修改公共类应用
到公共类应用实现Serializable
应该修正这个错误。因为一个Java内部类将保持对外部类的引用,你的功能
对象将不得不应用
的参考。由于星火需要序列化功能
对象,它要求应用
也可序列化。
Changing public class App
to public class App implements Serializable
should fix the error. Because a java inner class will keep a reference to the outer class, your Function
object will have a reference to App
. As Spark needs to serialize your Function
object, it requires App
is also serializable.
这篇关于RDD不可序列卡桑德拉/星火连接器的Java API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!