Spark UDF功能在独立Spark上失败 [英] Spark UDF function fail on Standalone Spark

查看:99
本文介绍了Spark UDF功能在独立Spark上失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有udf功能的spring boot java应用程序 myapp.jar .

I have spring boot java application myapp.jar with something udf function.

SparkConfuration.java

 public SparkSession sparkSession() {
     SparkSession session = SparkSession
             .builder()
             .appName("uniloader-spark-master")
             .master(sparkMaster)
             .config("spark.jars", sparkJars)
             .getOrCreate();
     log.info("Spark jars for control: " + sparkJars);
     session.sqlContext().udf().register("to_integer", new ToIntegerUdf(), DataTypes.IntegerType);
     return session;
 }

ToIntegerUdf.java

public class ToIntegerUdf implements UDF2<Object, Object, Integer> {

    private static final long serialVersionUID = 165436543635L;


    private static final Logger log = LoggerFactory.getLogger(BaseFilesUtils.class);

    @Override
    public Integer call(Object valueObj, Object delimiterObj) {
        try {
            log.info("value = " + valueObj);
            log.info("delimiter = " + delimiterObj);
            if (valueObj == null || delimiterObj == null) {
                return null;
            }
            String value = String.valueOf(valueObj);
            String delimiter = (String) delimiterObj;
            String doubleAsString = value.replaceAll(Pattern.quote(delimiter), ".");
            return new BigDecimal(doubleAsString).setScale(0, RoundingMode.HALF_UP).intValue();
        } catch (Exception e) {
            log.error("Error while to_integer transformation for " + valueObj + ". Details:", e);
            return null;
        }
    }
}

sparkJars包含myJar.jar的路径.

sparkJars contains path to myJar.jar.

使用Maven构建应用程序.Spark库版本为 3.02 ,scala版本为 2.12.10 .

Application build with Maven. Spark library version is 3.02 and scala version is 2.12.10.

当我在Spark Standalone 3.0.2 上运行应用程序时,出现错误:

When I running application on Spark Standalone 3.0.2 I have an error:

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

在火花工人日志中,我看到工人拿到myJar:

In spark worker log I see, worker fetch myJar:

21/03/23 19:33:24信息执行器:正在获取带有时间戳1616517199949的spark://demo.phoenixit.ru:39597/jars/myJar.jar

我认为依赖关系或提交问题,但我不知道如何解决.

I think problem with either dependency, or submitting, but I have no idea how to fix it.

推荐答案

我已修复它.

问题出在spring-boot-maven-plugin上.如果使用此插件,Spark Standalone无法获取有关UDF的信息.

The problem was with the spring-boot-maven-plugin. Spark standalone can't get info about UDF's if I use this plugin.

当我将插件替换为maven-shade-plugin时,一切都正常工作

When I replaced plugin with maven-shade-plugin, everythinh worked correct

这篇关于Spark UDF功能在独立Spark上失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆