如何(平均)在Spark数据帧中对数组数据进行分区 [英] How to (equally) partition array-data in spark dataframe
问题描述
我具有以下形式的数据框:
I have a dataframe of the following form:
import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")
|-- id: integer (nullable = false)
|-- data: array (nullable = true)
| |-- element: double (containsNull = false)
df.withColumn("data_size",size($"data")).show
+---+--------------------+---------+
| id| data|data_size|
+---+--------------------+---------+
| 1|[0.77845301260182...| 217|
| 2|[0.28806915178410...| 202|
| 3|[0.76304121847720...| 165|
| 4|[0.57955190088558...| 9|
| 5|[0.82134215959459...| 11|
| 6|[0.42193739241567...| 57|
| 7|[0.76381645621403...| 4|
| 8|[0.56507523859466...| 93|
| 9|[0.83541853717244...| 107|
| 10|[0.77955626749231...| 111|
| 11|[0.83721643562080...| 223|
| 12|[0.30546029947285...| 116|
| 13|[0.02705462199952...| 46|
| 14|[0.46646815407673...| 41|
| 15|[0.66312488908446...| 16|
| 16|[0.72644646115640...| 166|
| 17|[0.32210572380128...| 197|
| 18|[0.66680355567329...| 61|
| 19|[0.87055594653295...| 55|
| 20|[0.96600507545438...| 89|
+---+--------------------+---------+
现在我想应用一个昂贵的UDF,计算时间大约与数据数组的大小成正比.我想知道如何重新分区数据,以便每个分区具有大约相同数量的"records * data_size"(即数据点不只是记录).
Now I want to apply an expensive UDF, the time for the computation is ~ proportional to the size of the data array. I wodner how I can repartition my data such that each partition has approximatively the same number of "records*data_size" (i.e., data points NOT just records).
如果只是执行df.repartition(100)
,我可能会得到一些包含一些非常大的数组的分区,这就是整个spark阶段的瓶颈(所有其他任务已经完成).如果可以,我当然可以选择大量的分区,这将(几乎)确保每个记录都位于单独的分区中.但是还有另一种方法吗?
If just do df.repartition(100)
, I may get some partitons containing some very large arrays which are then the bottleneck of the entire spark stage (all other taks being already finished). If course I could just chose an insane amount of partitions which will (almost) ensure that each record is in a separate partition. But is there another way?
推荐答案
如您所说,您可以增加分区数量.我通常使用多个核:火花上下文默认并行度* 2-3 ..
就您而言,您可以使用更大的乘数.
As you said, you can increase the amount of partitions. I usually use a multiple of the number of cores: spark context default parallelism * 2-3..
In your case, you could use a bigger multiplier.
另一种解决方案是通过这种方式过滤您的df:
Another solution would be to filter split your df in this way:
- 只有更大数组的df
- df和其余的
然后您可以重新分区每个分区,执行计算并将它们合并回去.
You could then repartition each of them, perform computation and union them back.
请注意,重新分区可能会很昂贵,因为您有很多行需要拖曳.
Beware that repartitionning may be expensive since you have large rows to shuffle around.
You could have a look at theses slides (27+): https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil
他们遇到了非常糟糕的数据偏斜,并且不得不以一种有趣的方式来处理它.
They were experiencing very bad data skew and had to handle it in an interesting way.
这篇关于如何(平均)在Spark数据帧中对数组数据进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!