Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题 [英] Spark UDAF with ArrayType as bufferSchema performance issues

查看:36
本文介绍了Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一个返回元素数组的 UDAF.

I'm working on a UDAF that returns an array of elements.

每次更新的输入是索引和值的元组.

The input for each update is a tuple of index and value.

UDAF 的作用是对同一索引下的所有值求和.

What the UDAF does is to sum all the values under the same index.

示例:

对于 input(index,value) : (2,1), (3,1), (2,3)

For input(index,value) : (2,1), (3,1), (2,3)

应该返回 (0,0,4,1,...,0)

should return (0,0,4,1,...,0)

逻辑工作正常,但我的更新方法有问题,我的实现仅每行更新 1 个单元格,但该方法中的最后一个分配实际上复制整个数组 - 这是多余的并且非常耗时.

The logic works fine, but I have an issue with the update method, my implementation only updates 1 cell for each row, but the last assignment in that method actually copies the entire array - which is redundant and extremely time consuming.

仅此任务就占了我 98% 的查询执行时间.

我的问题是,我怎样才能减少那个时间?是否可以在缓冲区数组中分配 1 个值而不必替换整个缓冲区?

My question is, how can I reduce that time? Is it possible to assign 1 value in the buffer array without having to replace the entire buffer?

P.S.:我正在使用 Spark 1.6,我无法很快升级它,所以请坚持使用适用于此版本的解决方案.

P.S.: I'm working with Spark 1.6, and I cannot upgrade it anytime soon, so please stick to solution that would work with this version.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

推荐答案

TL;DR 要么不使用 UDAF,要么使用原始类型代替 ArrayType.

TL;DR Either don't use UDAF or use primitive types in place of ArrayType.

没有UserDefinedFunction

这两种解决方案都应该跳过内部和外部表示之间的昂贵杂耍.

Both solutions should skip expensive juggling between internal and external representation.

使用标准聚合和pivot

这使用标准的 SQL 聚合.虽然在内部进行了优化,但当键的数量和数组的大小增加时可能会很昂贵.

This uses standard SQL aggregations. While optimized internally it might be expensive when number of keys and size of the array grow.

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

您可以:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))

+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

使用带有 combineByKey/aggregateByKey 的 RDD API.

Using RDD API with combineByKey / aggregateByKey.

普通的 byKey 聚合与可变缓冲区.没有花里胡哨,但在广泛的输入范围内应该表现得相当好.如果你怀疑输入是稀疏的,你可以考虑更有效的中间表示,比如可变的 Map.

Plain old byKey aggregation with mutable buffer. No bells and whistles but should perform reasonably well with wide range of inputs. If you suspect input to be sparse, you may consider more efficient intermediate representation, like mutable Map.

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

UserDefinedFunction 与基本类型一起使用

Using UserDefinedFunction with primitive types

据我了解内部结构,性能瓶颈是 ArrayConverter.toCatalystImpl.

As far as I understand the internals, performance bottleneck is ArrayConverter.toCatalystImpl.

看起来每次调用都会调用它 MutableAggregationBuffer.update,然后为每个分配新的GenericArrayData>行.

It look like it is called for each call MutableAggregationBuffer.update, and in turn allocates new GenericArrayData for each Row.

如果我们重新定义 bufferSchema 为:

If we redefine bufferSchema as:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

updatemerge 都可以表示为缓冲区中原始值的简单替换.调用链会很长,但是 它不需要复制/转换和疯狂的分配.省略 null 检查你需要类似于

both update and merge can be expressed as plain replacements of primitive values in the buffer. Call chain will remain pretty long, but it won't require copies / conversions and crazy allocations. Omitting null checks you'll need something similar to

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

分别.

最后evaluate应该取Row并将其转换为输出Seq:

Finally evaluate should take Row and convert it to output Seq:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

请注意,在这个实现中,一个可能的瓶颈是 merge.虽然它不应该引入任何新的性能问题,但是对于 M 个存储桶,对 merge 的每次调用都是 O(M).

Please note that in this implementation a possible bottleneck is merge. While it shouldn't introduce any new performance problems, with M buckets, each call to merge is O(M).

使用 K 个唯一键和 P 个分区,在最坏的情况下,它将被调用 M * K 次,其中每个键,每个分区至少发生一次.这有效地将 merge 组件的复杂性增加到 O(M * N * K).

With K unique keys, and P partitions it will be called M * K times in the worst case scenario, where each key, occurs at least once on each partition. This effectively increases complicity of the merge component to O(M * N * K).

总的来说,您对此无能为力.但是,如果您对数据分布做出特定假设(数据稀疏,密钥分布均匀),则可以稍微缩短一些事情,然后先洗牌:

In general there is not much you can do about it. However if you make specific assumptions about the data distribution (data is sparse, key distribution is uniform), you can shortcut things a bit, and shuffle first:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

如果满足假设,它应该:

If the assumptions are satisfied it should:

  • 通过对稀疏对进行混洗而不是像密集数组一样的 Rows 来减少混洗大小.
  • 仅使用更新聚合数据(每个O(1))可能仅作为索引的子集接触.
  • Counterintuitively reduce shuffle size by shuffling sparse pairs, instead of dense array-like Rows.
  • Aggregate data using updates only (each O(1)) possibly touching only as subset of indices.

然而,如果一个或两个假设不满足,您可以预期 shuffle 大小会增加,而更新数量将保持不变.同时,数据倾斜会使事情比 update - shuffle - merge 场景更糟.

However if one or both assumptions are not satisfied, you can expect that shuffle size will increase while number of updates will stay the same. At the same time data skews can make things even worse than in update - shuffle - merge scenario.

使用 Aggregator 和强"类型的 Dataset:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

可以如下使用

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)

+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+

注意:

另见 SPARK-27296 - 用户定义的聚合函数(UDAF) 有一个主要的效率问题

这篇关于Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆