Spark UDF如何将Map转换为列 [英] Spark UDF how to convert Map to column

查看:72
本文介绍了Spark UDF如何将Map转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Apache Zeppelin笔记本.因此,spark基本上在交互模式下运行.我不能在这里使用闭包变量,因为齐柏林飞艇会在尝试序列化整个段落(更大的闭包)时抛出org.apache.spark.SparkException: Task not serializable.

I am using Apache Zeppelin notebook. So spark is basically running in interactive mode. I can't use closure variable here since zeppelin throws org.apache.spark.SparkException: Task not serializable as it tries to serialize whole paragraph (bigger closure).

因此,如果没有闭包方法,我唯一的选择就是将map作为列传递给UDF.

So without closure approach only option I have is to pass map as a column to UDF.

我有以下地图是从不公开的RDD中收集的:

I have a following map collected from paried RDD:

final val idxMap = idxMapRdd.collectAsMap

其中一种用于火花转换:

Which is being used in one of spark transformation here:

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {

    predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) =>  labelStr(predictions)}

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

但是使用lit(idxMap)语句时出现以下错误:

But with lit(idxMap) statement I got following error:

java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap

所以我尝试通过使用以下内容创建列:

So I tried creating column from using following:

val colmap = map(idxMapArr.map(lit _): _*)

但是出现以下错误:

<console>:139: error: type mismatch;
 found   : Iterable[org.apache.spark.sql.Column]
 required: Seq[org.apache.spark.sql.Column]
       val colmap =  map(idxMapArr.map(lit _): _*)

关闭方法(出于完整性考虑):

Closure approach (for completeness):

def predictionStrUDF2( idxMapArr: scala.collection.Map[Double,String]) = {
     udf((predictions: WrappedArray[Double] ) =>  labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))

哪个可以编译,但是当我执行cvmlPredictionsStr.show时,我得到了关注.我认为这是由于齐柏林飞艇的互动性所致

which compile but then when I do cvmlPredictionsStr.show I get following. I think this is due to interactive nature of zeppelin

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: com.github.fommil.netlib.F2jBLAS@294770d3)
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
    - object (class $iw, $iw@556a6aed)
    - field (class: $iw, name: $iw, type: class $iw)

推荐答案

问题标题是关于Spark UDF的,但似乎真正的问题是如何避免某些交互式环境出现的闭包序列化问题.

The question title is about Spark UDFs, but it seems the real question here is how to avoid the closure serialization problem that some interactive environments exhibit.

从对问题的描述来看,如果直接在您的一个笔记本单元中执行以下操作,则听起来不起作用:

From your description of the problem, it sounds like the following doesn't work if executed directly in one of your notebook cells:

val x = 5
sc.parallelize(1 to 10).filter(_ > x).collect()

这可能是因为 x 是单元格对象的类成员;当lambda捕获 x 时,它将尝试序列化整个单元格对象.单元对象不可序列化,结果是一个混乱的异常.使用包装对象可以避免此问题.请注意,这可能是一种声明此包装器的简便方法(也许仅嵌套在花括号内就足够了.)

This is likely because x is a class member of the cell object; when the lambda captures x, it attempts to serialize the entire cell object. The cell object isn't serializable, and the result is a messy exception. This problem can be avoided with a wrapper object. Note that is likely a slicker way to declare this wrapper (perhaps just nesting inside braces is sufficient).

object Wrapper {
    def f() {
        val x = 5
        sc.parallelize(1 to 10).filter(_ > x).collect()
    }
}
Wrapper.f()

解决此问题后,您可能仍然有疑问,但是当前问题涉及太多不同的子主题.有关闭包序列化问题的另一种解释,请参见

You may still have questions after resolving this issue, but currently the question touches on too many different subtopics. Another explanation of the closure serialization problem is available here.

这篇关于Spark UDF如何将Map转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆