集成 Spark SQL 和 Spark Streaming 时出现 Not Serializable 异常 [英] Not Serializable exception when integrating Spark SQL and Spark Streaming

查看:31
本文介绍了集成 Spark SQL 和 Spark Streaming 时出现 Not Serializable 异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的源代码,其中我从服务器端获取一些数据,它不断生成数据流.然后对于每个 RDD ,我正在应用 SQL Schema,一旦创建了这个表,我就会尝试从这个 DStream 中选择一些东西.

This is my source code where in Im getting some data from the server side, which keeps on generating a stream of data. And then for each RDD , I'm applying the SQL Schema, and once this table is created Im trying to select something from this DStream.

    List<String> males = new ArrayList<String>();
    JavaDStream<String> data = streamingContext.socketTextStream("localhost", (port));
    data.print();
    System.out.println("Socket connection established to read data from Subscriber Server");
    JavaDStream<SubscriberData> streamData = data
            .map(new Function<String, SubscriberData>() {
                public SubscriberData call(String record) {
                    String[] stringArray = record.split(",");
                    SubscriberData subscriberData = new SubscriberData();
                    subscriberData.setMsisdn(stringArray[0]);
                    subscriberData.setSubscriptionType(stringArray[1]);

                    subscriberData.setName(stringArray[2]);
                    subscriberData.setGender(stringArray[3]);
                    subscriberData.setProfession(stringArray[4]);
                    subscriberData.setMaritalStatus(stringArray[5]);


                    return subscriberData;

                }

            });

    streamData.foreachRDD(new Function<JavaRDD<SubscriberData>,Void>(){
        public Void call(JavaRDD<SubscriberData> rdd){

    JavaSQLContext sqlContext = new JavaSQLContext(sc);
    JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,SubscriberData.class);

    subscriberSchema.registerAsTable("SUBSCRIBER_DIMENSION");
    System.out.println("all data");
    JavaSchemaRDD names = sqlContext.sql("SELECT msisdn FROM SUBSCRIBER_DIMENSION WHERE GENDER='Male'");
    System.out.println("afterwards");

    List<String> males = new ArrayList<String>();

     males = names.map(new Function<Row, String>() {
        public String call(Row row) {
            return row.getString(0);
        }
    }).collect();
    System.out.println("before for");
    for (String name : males) {
        System.out.println(name);
            }
    return null;
    }
    });
streamingContext.start();

虽然我使用的类确实实现了序列化,但它抛出了这个可序列化的异常.

But it throws this Serializable Exception althought the classes Im using do implement Serialization.

    14/11/06 12:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 1415258720000 ms.1
 org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:75)
at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:206)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more

推荐答案

SparkContext 不可序列化,因为它只能在驱动程序上使用,不应包含在任何闭包中.恐怕目前对 Spark Streaming 上的 SQL 的支持仅处于研究级别.请参阅 来自 Spark 峰会的此演示文稿 了解详情.

SparkContext is not serializable as it's only usable on the driver and should NOT be included in any closure. I'm afraid that support for SQL on Spark Streaming is only at research level at the moment. See this presentation from the Spark Summit for the details.

要创建男性订阅者 ID 的预期 RDD,您可以使用 map 和 filter:

To create the intended RDD of ids of male subscribers you can use map and filter:

maleSubscribers = subscribers.filter(subsc => subcs.getGender == "Male")
                             .map(subsc => subsc.getMsisdn)

这篇关于集成 Spark SQL 和 Spark Streaming 时出现 Not Serializable 异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆