通过对字符串的反射来定义 spark udf [英] Define spark udf by reflection on a String

查看:29
本文介绍了通过对字符串的反射来定义 spark udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从包含 scala 函数定义的字符串中定义 spark(2.0) 中的 udf.这是片段:

I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

这给了我一个错误:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

但是,当我将 udf 定义为:

However when I define the udf as :

val f = udf((s:String) => 5)

它工作得很好.这里有什么问题?最终目标是获取一个具有 scala 函数定义的字符串并将其用作 udf.

it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.

推荐答案

正如 Giovanny 观察到的,问题在于类加载器不同(您可以通过调用 .getClass.getClassLoader任何对象).然后,当工作人员试图反序列化您的反射函数时,一切都崩溃了.

As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling .getClass.getClassLoader on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.

这是一个不涉及任何类加载器黑客的解决方案.这个想法是将反思步骤转移到工人身上.我们最终将不得不重做反射步骤,但每个工人只需一次.我认为这是非常理想的 - 即使您只在主节点上进行了一次反射,您也必须为每个工作人员做一些工作才能让他们识别该功能.

Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

然后,调用 sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show 工作正常.

Then, calling sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show works just fine.

随意注释掉 println - 这只是一种计算反射发生次数的简单方法.在 spark-shell --master 'local' 中只有一次,但在 spark-shell --master 'local[2]' 中是两次.

Feel free to comment out the println - it is just an easy way of counting how many times the reflection happened. In spark-shell --master 'local' that's only once, but in spark-shell --master 'local[2]' it's twice.

UDF 立即被评估,但在到达工作节点之前它永远不会被使用,因此惰性值 toolboxfunc 仅在工作节点上评估.此外,由于它们很懒惰,因此每个 worker 只能评估一次.

The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values toolbox and func only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.

这篇关于通过对字符串的反射来定义 spark udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆