避免Spark窗口功能中单个分区模式的性能影响 [英] Avoid performance impact of a single partition mode in Spark window functions
问题描述
我的问题是由计算火花数据帧中连续行之间的差异的用例触发的.
My question is triggered by the use case of calculating the differences between consecutive rows in a spark dataframe.
例如,我有:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
如果我选择使用窗口"函数来计算这些,那么我可以这样做:
If I choose to calculate these using "Window" functions, then I can do that like so:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
问题:我将数据框明确地划分为一个分区.这会对性能产生什么影响,如果有的话,为什么会这样,我又如何避免呢?因为当我不指定分区时,会收到以下警告:
Question: I explicitly partitioned the dataframe in a single partition. What is the performance impact of this and, if there is, why is that so and how could I avoid it? Because when I do not specify a partition, I get the following warning:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
推荐答案
实际上,对性能的影响几乎与完全省略partitionBy
子句的影响相同.所有记录都将改组到一个分区,在本地进行排序,并一个接一个地进行迭代.
In practice performance impact will be almost the same as if you omitted partitionBy
clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one.
差异仅在于总共创建的分区数.让我们用一个带有10个分区和1000条记录的简单数据集的示例进行说明:
The difference is only in the number of partitions created in total. Let's illustrate that with an example using simple dataset with 10 partitions and 1000 records:
df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))
如果您定义不带子句的框架
If you define frame without partition by clause
w_unpart = Window.orderBy(f.col("index").asc())
并与lag
df_lag_unpart = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)
总共只有一个分区:
df_lag_unpart.rdd.glom().map(len).collect()
[1000]
与带有虚拟索引的帧定义相比(与您的代码相比,简化了一点:
Compared to that frame definition with dummy index (simplified a bit compared to your code:
w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())
将使用等于spark.sql.shuffle.partitions
的分区数:
spark.conf.set("spark.sql.shuffle.partitions", 11)
df_lag_part = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)
df_lag_part.rdd.glom().count()
11
仅具有一个非空分区:
df_lag_part.rdd.glom().filter(lambda x: x).count()
1
不幸的是,在PySpark中没有可用于解决此问题的通用解决方案.这只是实现与分布式处理模型相结合的一种内在机制.
Unfortunately there is no universal solution which can be used to address this problem in PySpark. This just an inherent mechanism of the implementation combined with distributed processing model.
由于index
列是连续的,因此您可以生成每块具有固定记录数的人工分区键:
Since index
column is sequential you could generate artificial partitioning key with fixed number of records per block:
rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))
df_with_block = df.withColumn(
"block", (f.col("index") / rec_per_block).cast("int")
)
并使用它来定义框架规格:
and use it to define frame specification:
w_with_block = Window.partitionBy("block").orderBy("index")
df_lag_with_block = df_with_block.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)
这将使用预期的分区数量:
This will use expected number of partitions:
df_lag_with_block.rdd.glom().count()
11
具有大致均匀的数据分布(我们无法避免哈希冲突):
with roughly uniform data distribution (we cannot avoid hash collisions):
df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]
但在块边界上有许多空白:
but with a number of gaps on the block boundaries:
df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12
由于边界易于计算:
from itertools import chain
boundary_idxs = sorted(chain.from_iterable(
# Here we depend on sequential identifiers
# This could be generalized to any monotonically increasing
# id by taking min and max per block
(idx - 1, idx) for idx in
df_lag_with_block.groupBy("block").min("index")
.drop("block").rdd.flatMap(lambda x: x)
.collect()))[2:] # The first boundary doesn't carry useful inf.
您始终可以选择:
missing = df_with_block.where(f.col("index").isin(boundary_idxs))
并分别填写:
# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))
和join
:
combined = (df_lag_with_block
.join(missing_with_lag, ["index"], "leftouter")
.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))
获得理想的结果:
mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0
这篇关于避免Spark窗口功能中单个分区模式的性能影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!