避免Spark窗口功能中单个分区模式的性能影响 [英] Avoid performance impact of a single partition mode in Spark window functions

查看:122
本文介绍了避免Spark窗口功能中单个分区模式的性能影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是由计算火花数据帧中连续行之间的差异的用例触发的.

My question is triggered by the use case of calculating the differences between consecutive rows in a spark dataframe.

例如,我有:

>>> df.show()
+-----+----------+
|index|      col1|
+-----+----------+
|  0.0|0.58734024|
|  1.0|0.67304325|
|  2.0|0.85154736|
|  3.0| 0.5449719|
+-----+----------+

如果我选择使用窗口"函数来计算这些,那么我可以这样做:

If I choose to calculate these using "Window" functions, then I can do that like so:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index|      col1| diffs_col1|
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|
|  2.0|0.85154736|-0.30657548|
|  3.0| 0.5449719|       null|
+-----+----------+-----------+

问题:我将数据框明确地划分为一个分区.这会对性能产生什么影响,如果有的话,为什么会这样,我又如何避免呢?因为当我不指定分区时,会收到以下警告:

Question: I explicitly partitioned the dataframe in a single partition. What is the performance impact of this and, if there is, why is that so and how could I avoid it? Because when I do not specify a partition, I get the following warning:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

推荐答案

实际上,对性能的影响几乎与完全省略partitionBy子句的影响相同.所有记录都将改组到一个分区,在本地进行排序,并一个接一个地进行迭代.

In practice performance impact will be almost the same as if you omitted partitionBy clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one.

差异仅在于总共创建的分区数.让我们用一个带有10个分区和1000条记录的简单数据集的示例进行说明:

The difference is only in the number of partitions created in total. Let's illustrate that with an example using simple dataset with 10 partitions and 1000 records:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

如果您定义不带子句的框架

If you define frame without partition by clause

w_unpart = Window.orderBy(f.col("index").asc())

并与lag

df_lag_unpart = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)

总共只有一个分区:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

与带有虚拟索引的帧定义相比(与您的代码相比,简化了一点:

Compared to that frame definition with dummy index (simplified a bit compared to your code:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

将使用等于spark.sql.shuffle.partitions的分区数:

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)

df_lag_part.rdd.glom().count()

11

仅具有一个非空分区:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

不幸的是,在PySpark中没有可用于解决此问题的通用解决方案.这只是实现与分布式处理模型相结合的一种内在机制.

Unfortunately there is no universal solution which can be used to address this problem in PySpark. This just an inherent mechanism of the implementation combined with distributed processing model.

由于index列是连续的,因此您可以生成每块具有固定记录数的人工分区键:

Since index column is sequential you could generate artificial partitioning key with fixed number of records per block:

rec_per_block  = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(
    "block", (f.col("index") / rec_per_block).cast("int")
)

并使用它来定义框架规格:

and use it to define frame specification:

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)

这将使用预期的分区数量:

This will use expected number of partitions:

df_lag_with_block.rdd.glom().count()

11

具有大致均匀的数据分布(我们无法避免哈希冲突):

with roughly uniform data distribution (we cannot avoid hash collisions):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

但在块边界上有许多空白:

but with a number of gaps on the block boundaries:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

由于边界易于计算:

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(
    # Here we depend on sequential identifiers
    # This could be generalized to any monotonically increasing
    # id by taking min and max per block
    (idx - 1, idx) for idx in 
    df_lag_with_block.groupBy("block").min("index")
        .drop("block").rdd.flatMap(lambda x: x)
        .collect()))[2:]  # The first boundary doesn't carry useful inf.

您始终可以选择:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

并分别填写:

# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))

join:

combined = (df_lag_with_block
    .join(missing_with_lag, ["index"], "leftouter")
    .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

获得理想的结果:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
    combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0

这篇关于避免Spark窗口功能中单个分区模式的性能影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆