RDD不可序列卡桑德拉/星火连接器的Java API [英] RDD not serializable Cassandra/Spark connector java API

查看:168
本文介绍了RDD不可序列卡桑德拉/星火连接器的Java API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我previously就如何在这里一个Java Maven项目用火花来查询卡桑德拉一些问题:<一href=\"http://stackoverflow.com/questions/25893945/querying-data-in-cassandra-via-spark-in-a-java-maven-project\">Querying通过在一个Java Maven项目 Spark在卡桑德拉数据

so I previously had some questions on how to query cassandra using spark in a java maven project here: Querying Data in Cassandra via Spark in a Java Maven Project

那么我的问题得到回答,它的工作,但我碰到的一个问题(可能是一个问题)。我想现在使用的datastax的Java API。这里是我的code:

Well my question was answered and it worked, however I've run into an issue (possibly an issue). I'm trying to now use the datastax java API. Here is my code:

package com.angel.testspark.test2;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
               }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

这是我的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

现在我知道确切位置我的错误是。这是的System.out.println(数据为豆人:\\ n+ StringUtils.join(\\ n,rdd.toArray())); ,因为我需要RDD转换成数组。然而,API文档说我应该能够做到这一点......这是code复制,并从文档粘贴。我为什么不能序列化RDD到一个数组?

Now I KNOW exactly where my error is. It is System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); because I need to convert rdd to an Array. However, the API documentation SAID i should be able to do this... this is code copied and pasted from the documentation. Why can I not serialize the RDD to an array?

我已经插入虚拟数据使用将在我的岗位,我包括在上面的链接插入我的卡珊德拉

I've already inserted dummy data into my cassandra using the insertions in my post that I included in the link above.

另外,我解决了previous错误是,当我改变了我所有的getter和setter为小写。当我在他们使用的首都,它产生一个错误。为什么我不能在我的getter和setter方法​​用在这里建都?

Also, a previous error that I solved was when i changed all of my getters and setters to lowercase. When I used capitals in them, it produced an error. Why can't I use capitals in my getters and setters here?

谢谢,
天使

推荐答案

修改公共类应用公共类应用实现Serializable 应该修正​​这个错误。因为一个Java内部类将保持对外部类的引用,你的功能对象将不得不应用的参考。由于星火需要序列化功能对象,它要求应用也可序列化。

Changing public class App to public class App implements Serializable should fix the error. Because a java inner class will keep a reference to the outer class, your Function object will have a reference to App. As Spark needs to serialize your Function object, it requires App is also serializable.

这篇关于RDD不可序列卡桑德拉/星火连接器的Java API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆