如何(平均)在Spark数据帧中对数组数据进行分区 [英] How to (equally) partition array-data in spark dataframe

查看:60
本文介绍了如何(平均)在Spark数据帧中对数组数据进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有以下形式的数据框:

I have a dataframe of the following form:

import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")

|-- id: integer (nullable = false)
|-- data: array (nullable = true)
|    |-- element: double (containsNull = false)


df.withColumn("data_size",size($"data")).show

+---+--------------------+---------+
| id|                data|data_size|
+---+--------------------+---------+
|  1|[0.77845301260182...|      217|
|  2|[0.28806915178410...|      202|
|  3|[0.76304121847720...|      165|
|  4|[0.57955190088558...|        9|
|  5|[0.82134215959459...|       11|
|  6|[0.42193739241567...|       57|
|  7|[0.76381645621403...|        4|
|  8|[0.56507523859466...|       93|
|  9|[0.83541853717244...|      107|
| 10|[0.77955626749231...|      111|
| 11|[0.83721643562080...|      223|
| 12|[0.30546029947285...|      116|
| 13|[0.02705462199952...|       46|
| 14|[0.46646815407673...|       41|
| 15|[0.66312488908446...|       16|
| 16|[0.72644646115640...|      166|
| 17|[0.32210572380128...|      197|
| 18|[0.66680355567329...|       61|
| 19|[0.87055594653295...|       55|
| 20|[0.96600507545438...|       89|
+---+--------------------+---------+

现在我想应用一个昂贵的UDF,计算时间大约与数据数组的大小成正比.我想知道如何重新分区数据,以便每个分区具有大约相同数量的"records * data_size"(即数据点不只是记录).

Now I want to apply an expensive UDF, the time for the computation is ~ proportional to the size of the data array. I wodner how I can repartition my data such that each partition has approximatively the same number of "records*data_size" (i.e., data points NOT just records).

如果只是执行df.repartition(100),我可能会得到一些包含一些非常大的数组的分区,这就是整个spark阶段的瓶颈(所有其他任务已经完成).如果可以,我当然可以选择大量的分区,这将(几乎)确保每个记录都位于单独的分区中.但是还有另一种方法吗?

If just do df.repartition(100), I may get some partitons containing some very large arrays which are then the bottleneck of the entire spark stage (all other taks being already finished). If course I could just chose an insane amount of partitions which will (almost) ensure that each record is in a separate partition. But is there another way?

推荐答案

如您所说,您可以增加分区数量.我通常使用多个核:火花上下文默认并行度* 2-3 ..
就您而言,您可以使用更大的乘数.

As you said, you can increase the amount of partitions. I usually use a multiple of the number of cores: spark context default parallelism * 2-3..
In your case, you could use a bigger multiplier.

另一种解决方案是通过这种方式过滤您的df:

Another solution would be to filter split your df in this way:

  • 只有更大数组的df
  • df和其余的

然后您可以重新分区每个分区,执行计算并将它们合并回去.

You could then repartition each of them, perform computation and union them back.

请注意,重新分区可能会很昂贵,因为您有很多行需要拖曳.

Beware that repartitionning may be expensive since you have large rows to shuffle around.

您可以看一下这些幻灯片(超过27张):

You could have a look at theses slides (27+): https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil

他们遇到了非常糟糕的数据偏斜,并且不得不以一种有趣的方式来处理它.

They were experiencing very bad data skew and had to handle it in an interesting way.

这篇关于如何(平均)在Spark数据帧中对数组数据进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆