通过在字符串上进行反射来定义spark udf [英] Define spark udf by reflection on a String

查看:264
本文介绍了通过在字符串上进行反射来定义spark udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从包含scala函数定义的字符串中在spark(2.0)中定义一个udf.以下是代码段:

I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

这给我一个错误:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

但是,当我将udf定义为:

However when I define the udf as :

val f = udf((s:String) => 5)

它工作正常.这里的问题是什么?最终目标是获取一个具有scala函数的defn的字符串并将其用作udf.

it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.

推荐答案

正如Giovanny所观察到的,问题在于类加载器是不同的(您可以通过在任何对象上调用.getClass.getClassLoader来进行更多研究).然后,当工作人员尝试反序列化您所反映的功能时,所有的地狱都会崩溃.

As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling .getClass.getClassLoader on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.

这是一个不涉及任何类加载器黑客的解决方案.想法是将反思步骤移交给工人.我们最终将不得不重做反射步骤,但每个工人只需一次.我认为这是非常理想的-即使您仅在主节点上执行一次反射,也必须为每个工作人员做大量工作才能使他们识别该功能.

Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

然后,调用sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show可以正常工作.

Then, calling sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show works just fine.

随意注释println-这只是计算反射发生次数的简单方法.在spark-shell --master 'local'中只有一次,而在spark-shell --master 'local[2]'中则是两次.

Feel free to comment out the println - it is just an easy way of counting how many times the reflection happened. In spark-shell --master 'local' that's only once, but in spark-shell --master 'local[2]' it's twice.

UDF立即得到评估,但是直到到达工作节点为止,它才被使用,因此懒惰值toolboxfunc仅在工作者上得到评估.此外,由于他们很懒惰,因此每个工人只能得到一次评估.

The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values toolbox and func only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.

这篇关于通过在字符串上进行反射来定义spark udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆