java.lang.ClassCastException在远程服务器上的火花作业中使用的lambda前pressions [英] java.lang.ClassCastException using lambda expressions in spark job on remote server

查看:3964
本文介绍了java.lang.ClassCastException在远程服务器上的火花作业中使用的lambda前pressions的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用sparkjava.com框架来构建一个Web API,用于我的Apache火花的作业。我的code是:

I'm trying to build a web api for my apache spark jobs using sparkjava.com framework. My code is:

@Override
public void init() {
    get("/hello",
            (req, res) -> {
                String sourcePath = "hdfs://spark:54310/input/*";

                SparkConf conf = new SparkConf().setAppName("LineCount");
                conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" });
                File configFile = new File("config.properties");

                String sparkURI = "spark://hamrah:7077";

                conf.setMaster(sparkURI);
                conf.set("spark.driver.allowMultipleContexts", "true");
                JavaSparkContext sc = new JavaSparkContext(conf);

                @SuppressWarnings("resource")
                JavaRDD<String> log = sc.textFile(sourcePath);

                JavaRDD<String> lines = log.filter(x -> {
                    return true;
                });

                return lines.count();
            });
}

如果我删除拉姆达前pression或把它简单的罐子,而不是Web服务(不知何故一个servlet)内将没有任何错误运行。但使用servlet内的lambda前pression将导致此异常:

If I remove the lambda expression or put it inside a simple jar rather than a web service (somehow a servlet) it will run without any error. But using a lambda expression inside a servlet will result this exception:

15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

P.S:我试过的球衣和javaspark与码头,Tomcat和树脂和所有这些组合使我同样的结果。

P.S: I tried combination of jersey and javaspark with jetty, tomcat and resin and all of them led me to the same result.

推荐答案

在这里有什么,是一个后续的错误,掩盖了原来的错误。

What you have here, is a follow-up error which masks the original error.

当拉姆达实例系列化,他们使用 writeReplace 来化解他们的具体JRE
从持久形式是实施 SerializedLambda
实例。当 SerializedLambda 实例已经恢复,其的readResolve 方法将被调用
重构相应的lambda实例。由于文件说,它会通过调用其定义的原始拉姆达类的特殊方法这样做(见这个答案)。重要的一点是需要的原始类,这就是缺少你的情况。

When lambda instances are serialized, they use writeReplace to dissolve their JRE specific implementation from the persistent form which is a SerializedLambda instance. When the SerializedLambda instance has been restored, its readResolve method will be invoked to reconstitute the appropriate lambda instance. As the documentation says, it will do so by invoking a special method of the class which defined the original lambda (see also this answer). The important point is that the original class is needed and that’s what’s missing in your case.

但是,还有的的ObjectInputStream 的... ...特殊行为。当它遇到一个例外,它不会立即跳伞。它会记录异常和持续的过程,标志着所有的对象目前正在读取,从而根据错误的对象是错误的,以及上。只有在过程结束时,将它扔遇到的原始异常。是什么使得它如此奇怪的是,它也将继续尝试设置这些对象的字段。但是,当你看方法 ObjectInputStream.readOrdinaryObject 行1806:

But there’s a …special… behavior of the ObjectInputStream. When it encounters an exception, it doesn’t bail out immediately. It will record the exception and continue the process, marking all object being currently read, thus depending on the erroneous object as being erroneous as well. Only at the end of the process it will throw the original exception it encountered. What makes it so strange is that it will also continue trying to set the fields of these object. But when you look at the method ObjectInputStream.readOrdinaryObject line 1806:

…
    if (obj != null &&
        handles.lookupException(passHandle) == null &&
        desc.hasReadResolveMethod())
    {
        Object rep = desc.invokeReadResolve(obj);
        if (unshared && rep.getClass().isArray()) {
            rep = cloneArray(rep);
        }
        if (rep != obj) {
            handles.setObject(passHandle, obj = rep);
        }
    }

    return obj;
}

你看,它不叫的readResolve 方法时, lookupException 报告非 - 无效例外。但是,当替换并没有发生,这不是一个好主意,继续试图设置引用的字段值,但是这正是是在这里发生,因此产生一个 ClassCastException异常

you see that it doesn’t call the readResolve method when lookupException reports a non-null exception. But when the substitution did not happen, it’s not a good idea to continue trying to set the field values of the referrer but that’s exactly what’s happens here, hence producing a ClassCastException.

您可以轻松地重现该问题:

You can easily reproduce the problem:

public class Holder implements Serializable {
    Runnable r;
}
public class Defining {
    public static Holder get() {
        final Holder holder = new Holder();
        holder.r=(Runnable&Serializable)()->{};
        return holder;
    }
}
public class Writing {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException {
        try(FileOutputStream os=new FileOutputStream(f);
            ObjectOutputStream   oos=new ObjectOutputStream(os)) {
            oos.writeObject(Defining.get());
        }
        System.out.println("written to "+f);
    }
}
public class Reading {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException, ClassNotFoundException {
        try(FileInputStream is=new FileInputStream(f);
            ObjectInputStream ois=new ObjectInputStream(is)) {
            Holder h=(Holder)ois.readObject();
            System.out.println(h.r);
            h.r.run();
        }
        System.out.println("read from "+f);
    }
}

编译这四个类和运行。然后删除类文件 Defining.class 并运行阅读。然后你会得到一个

Compile these four classes and run Writing. Then delete the class file Defining.class and run Reading. Then you will get a

Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

(带1.8.0_20测试)

(Tested with 1.8.0_20)

底线是,你可能对这个序列化问题忘记了一旦明白发生了什么事,你有解决你的问题做的是确保它定义拉姆达前pression类也可在拉姆达反序列化的运行时间。

The bottom line is that you may forget about this Serialization issue once it is understood what’s happening, all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.

举例星火作业直接从IDE(火花提交分配默认罐)运行

Example for Spark Job to run directly from IDE (spark-submit distributes jar by default):

SparkConf sconf = new SparkConf()
  .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory")
  .set("spark.eventLog.enabled", "true")
  .setJars(new String[]{"/path/to/jar/with/your/class.jar"})
  .setMaster("spark://spark.standalone.uri:7077");

这篇关于java.lang.ClassCastException在远程服务器上的火花作业中使用的lambda前pressions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆