Spark UDF 如何将 Map 转换为列 [英] Spark UDF how to convert Map to column

查看:61
本文介绍了Spark UDF 如何将 Map 转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Apache Zeppelin 笔记本.所以spark基本上是在交互模式下运行的.我不能在这里使用闭包变量,因为 zeppelin 抛出 org.apache.spark.SparkException: Task not serializable 因为它试图序列化整个段落(更大的闭包).

I am using Apache Zeppelin notebook. So spark is basically running in interactive mode. I can't use closure variable here since zeppelin throws org.apache.spark.SparkException: Task not serializable as it tries to serialize whole paragraph (bigger closure).

所以如果没有闭包方法,我唯一的选择是将地图作为一列传递给 UDF.

So without closure approach only option I have is to pass map as a column to UDF.

我有一张从 paried RDD 收集的地图:

I have a following map collected from paried RDD:

final val idxMap = idxMapRdd.collectAsMap

此处用于火花转换之一:

Which is being used in one of spark transformation here:

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {

    predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) =>  labelStr(predictions)}

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

但是使用 lit(idxMap) 语句我得到以下错误:

But with lit(idxMap) statement I got following error:

java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap

所以我尝试使用以下方法创建列:

So I tried creating column from using following:

val colmap = map(idxMapArr.map(lit _): _*)

但出现以下错误:

<console>:139: error: type mismatch;
 found   : Iterable[org.apache.spark.sql.Column]
 required: Seq[org.apache.spark.sql.Column]
       val colmap =  map(idxMapArr.map(lit _): _*)

关闭方法(为了完整性):

Closure approach (for completeness):

def predictionStrUDF2( idxMapArr: scala.collection.Map[Double,String]) = {
     udf((predictions: WrappedArray[Double] ) =>  labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))

它编译但是当我做 cvmlPredictionsStr.show 时,我得到了关注.我认为这是由于 zeppelin 的交互性

which compile but then when I do cvmlPredictionsStr.show I get following. I think this is due to interactive nature of zeppelin

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: com.github.fommil.netlib.F2jBLAS@294770d3)
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
    - object (class $iw, $iw@556a6aed)
    - field (class: $iw, name: $iw, type: class $iw)

推荐答案

问题标题是关于 Spark UDF,但这里真正的问题似乎是如何避免某些交互式环境所表现出的闭包序列化问题.

The question title is about Spark UDFs, but it seems the real question here is how to avoid the closure serialization problem that some interactive environments exhibit.

根据您对问题的描述,如果直接在您的笔记本单元格之一中执行以下操作,则听起来似乎不起作用:

From your description of the problem, it sounds like the following doesn't work if executed directly in one of your notebook cells:

val x = 5
sc.parallelize(1 to 10).filter(_ > x).collect()

这可能是因为 x 是 cell 对象的类成员;当 lambda 捕获 x 时,它会尝试序列化整个单元格对象.单元格对象不可序列化,结果是一个混乱的异常.使用包装器对象可以避免这个问题.请注意,这可能是声明此包装器的一种更灵活的方式(也许仅嵌套在大括号内就足够了).

This is likely because x is a class member of the cell object; when the lambda captures x, it attempts to serialize the entire cell object. The cell object isn't serializable, and the result is a messy exception. This problem can be avoided with a wrapper object. Note that is likely a slicker way to declare this wrapper (perhaps just nesting inside braces is sufficient).

object Wrapper {
    def f() {
        val x = 5
        sc.parallelize(1 to 10).filter(_ > x).collect()
    }
}
Wrapper.f()

解决此问题后您可能仍有疑问,但目前该问题涉及太多不同的子主题.闭包序列化问题的另一种解释是 这里.

You may still have questions after resolving this issue, but currently the question touches on too many different subtopics. Another explanation of the closure serialization problem is available here.

这篇关于Spark UDF 如何将 Map 转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆