在 UDF 中使用时 Spark 累加器为空 [英] Spark accumulator empty when used in UDF

查看:45
本文介绍了在 UDF 中使用时 Spark 累加器为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在优化我的 Spark 流程,并尝试使用带有累加器的 UDF.我已经让累加器自行工作,并希望看看使用 UDF 是否能提高速度.但是,当我将累加器包装在 UDF 中时,它仍然是空的.我是不是特别有什么问题?Lazy Execution 是否有什么问题,即使我的 .count 仍然没有执行?

I was working on optimizing my Spark process, and was trying to use a UDF with an accumulator. I have gotten the accumulator to work on its own, and was looking to see if I would get any speed up using a UDF. But instead, when I wrap the accumulator in the UDF, it remains empty. Am I going something wrong in particular? Is there something going on with Lazy Execution where even with my .count it is still not executing?

输入:

0,[0.11,0.22]
1,[0.22,0.33]

输出:

(0,0,0.11),(0,1,0.22),(1,0,0.22),(1,1,0.33)

代码

  val accum = new MapAccumulator2d()
  val session = SparkSession.builder().getOrCreate()
  session.sparkContext.register(accum)

  //Does not work - Empty Accumlator
  val rowAccum = udf((itemId: Int, item: mutable.WrappedArray[Float]) => {
    val map = item
      .zipWithIndex
      .map(ff => {
        ((itemId, ff._2), ff._1.toDouble)
      }).toMap
    accum.add(map)
    itemId
  })
  dataFrame.select(rowAccum(col("itemId"), col("jaccardList"))).count

  //Works
  dataFrame.foreach(f => {
    val map = f.getAs[mutable.WrappedArray[Float]](1)
      .zipWithIndex
      .map(ff => {
        ((f.getInt(0), ff._2), ff._1.toDouble)
      }).toMap
    accum.add(map)
  })

  val list = accum.value.toList.map(f => (f._1._1, f._1._2, f._2))

推荐答案

看起来这里唯一的问题是使用 count 来触发"懒惰评估的 UDF:Spark 足够聪明"意识到 select 操作不能改变 count 的结果,因此不会真正执行 UDF.选择不同的操作(例如 collect)表明 UDF 工作并更新累加器.

Looks like the only issue here is using count to "trigger" the lazily-evaluated UDF: Spark is "smart" enough to realize that the select operation can't change the result of count and therefore doesn't really execute the UDF. Choosing a different operation (e.g. collect) shows that the UDF works and updates the accumulator.

这是一个(更简洁的)例子:

Here's a (more concise) example:

val accum = sc.longAccumulator

val rowAccum = udf((itemId: Int) => { accum.add(itemId); itemId })

val dataFrame = Seq(1,2,3,4,5).toDF("itemId")

dataFrame.select(rowAccum(col("itemId"))).count()   // won't trigger UDF
println(s"RESULT: ${accum.value}") // prints 0

dataFrame.select(rowAccum(col("itemId"))).collect() // triggers UDF 
println(s"RESULT: ${accum.value}") // prints 15

这篇关于在 UDF 中使用时 Spark 累加器为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆