在 UDF 中使用时 Spark 累加器为空 [英] Spark accumulator empty when used in UDF
问题描述
我正在优化我的 Spark 流程,并尝试使用带有累加器的 UDF.我已经让累加器自行工作,并希望看看使用 UDF 是否能提高速度.但是,当我将累加器包装在 UDF 中时,它仍然是空的.我是不是特别有什么问题?Lazy Execution 是否有什么问题,即使我的 .count
仍然没有执行?
I was working on optimizing my Spark process, and was trying to use a UDF with an accumulator. I have gotten the accumulator to work on its own, and was looking to see if I would get any speed up using a UDF. But instead, when I wrap the accumulator in the UDF, it remains empty. Am I going something wrong in particular? Is there something going on with Lazy Execution where even with my .count
it is still not executing?
输入:
0,[0.11,0.22]
1,[0.22,0.33]
输出:
(0,0,0.11),(0,1,0.22),(1,0,0.22),(1,1,0.33)
代码
val accum = new MapAccumulator2d()
val session = SparkSession.builder().getOrCreate()
session.sparkContext.register(accum)
//Does not work - Empty Accumlator
val rowAccum = udf((itemId: Int, item: mutable.WrappedArray[Float]) => {
val map = item
.zipWithIndex
.map(ff => {
((itemId, ff._2), ff._1.toDouble)
}).toMap
accum.add(map)
itemId
})
dataFrame.select(rowAccum(col("itemId"), col("jaccardList"))).count
//Works
dataFrame.foreach(f => {
val map = f.getAs[mutable.WrappedArray[Float]](1)
.zipWithIndex
.map(ff => {
((f.getInt(0), ff._2), ff._1.toDouble)
}).toMap
accum.add(map)
})
val list = accum.value.toList.map(f => (f._1._1, f._1._2, f._2))
推荐答案
看起来这里唯一的问题是使用 count
来触发"懒惰评估的 UDF:Spark 足够聪明"意识到 select
操作不能改变 count
的结果,因此不会真正执行 UDF.选择不同的操作(例如 collect
)表明 UDF 工作并更新累加器.
Looks like the only issue here is using count
to "trigger" the lazily-evaluated UDF: Spark is "smart" enough to realize that the select
operation can't change the result of count
and therefore doesn't really execute the UDF. Choosing a different operation (e.g. collect
) shows that the UDF works and updates the accumulator.
这是一个(更简洁的)例子:
Here's a (more concise) example:
val accum = sc.longAccumulator
val rowAccum = udf((itemId: Int) => { accum.add(itemId); itemId })
val dataFrame = Seq(1,2,3,4,5).toDF("itemId")
dataFrame.select(rowAccum(col("itemId"))).count() // won't trigger UDF
println(s"RESULT: ${accum.value}") // prints 0
dataFrame.select(rowAccum(col("itemId"))).collect() // triggers UDF
println(s"RESULT: ${accum.value}") // prints 15
这篇关于在 UDF 中使用时 Spark 累加器为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!