避免 Spark 窗口函数中单一分区模式的性能影响 [英] Avoid performance impact of a single partition mode in Spark window functions

查看:38
本文介绍了避免 Spark 窗口函数中单一分区模式的性能影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是由计算 spark 数据帧中连续行之间的差异的用例触发的.

例如,我有:

<预><代码>>>>df.show()+-----+------------+|索引|列1|+-----+------------+|0.0|0.58734024||1.0|0.67304325||2.0|0.85154736||3.0|0.5449719|+-----+------------+

如果我选择使用窗口"函数来计算这些,那么我可以这样做:

<预><代码>>>>winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())>>>导入 pyspark.sql.functions 作为 f>>>df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()+-----+------------+-----------+|索引|列1|diffs_col1|+-----+------------+-----------+|0.0|0.58734024|0.085703015||1.0|0.67304325|0.17850411||2.0|0.85154736|-0.30657548||3.0|0.5449719|空|+-----+------------+-----------+

问题:我在单个分区中明确地对数据帧进行了分区.这对性能有什么影响,如果有,为什么会这样,我该如何避免?因为当我不指定分区时,会收到以下警告:

16/12/24 13:52:27 WARN WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降.

解决方案

在实践中,性能影响几乎与完全省略 partitionBy 子句相同.所有的记录都会被打乱到一个分区,在本地排序,然后一一依次迭代.

区别仅在于创建的分区总数.让我们通过一个使用具有 10 个分区和 1000 条记录的简单数据集的示例来说明:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

如果你定义没有partition by子句的frame

w_unpart = Window.orderBy(f.col("index").asc())

并与 lag

一起使用

df_lag_unpart = df.withColumn("diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1"))

总共只有一个分区:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

与带有虚拟索引的框架定义相比(与您的代码相比稍微简化了一点:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

将使用等于 spark.sql.shuffle.partitions 的分区数:

spark.conf.set("spark.sql.shuffle.partitions", 11)df_lag_part = df.withColumn("diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1"))df_lag_part.rdd.glom().count()

11

只有一个非空分区:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

不幸的是,在 PySpark 中没有可用于解决此问题的通用解决方案.这只是结合分布式处理模型的固有实现机制.

由于 index 列是连续的,您可以生成每个块具有固定数量记录的人工分区键:

rec_per_block = df.count()//int(spark.conf.get("spark.sql.shuffle.partitions"))df_with_block = df.withColumn("块", (f.col("index")/rec_per_block).cast("int"))

并用它来定义框架规范:

w_with_block = Window.partitionBy("block").orderBy("index")df_lag_with_block = df_with_block.withColumn("diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1"))

这将使用预期的分区数:

df_lag_with_block.rdd.glom().count()

11

具有大致均匀的数据分布(我们无法避免哈希冲突):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

但是在块边界上有许多间隙:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

因为边界很容易计算:

来自 itertools 导入链boundary_idxs = sorted(chain.from_iterable(# 这里我们依赖于顺序标识符# 这可以推广到任何单调递增# 通过获取每个块的最小值和最大值来识别(idx - 1, idx) 为 idx 输入df_lag_with_block.groupBy("block").min("index").drop("block").rdd.flatMap(lambda x: x).collect()))[2:] # 第一个边界不携带有用的信息.

您可以随时选择:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

并分别填写:

# 我们这里使用没有分区的窗口.由于记录数# 会很小,这不会是性能问题# 但会产生将所有数据移动到单个分区"警告missing_with_lag = missing.withColumn("diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")).select("index", f.col("diffs_col1").alias("diffs_fill"))

加入:

combined = (df_lag_with_block.join(missing_with_lag, ["index"], "leftouter").withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

得到想要的结果:

mismatched = combine.join(df_lag_unpart, ["index"], "outer").where(组合[diffs_col1"] != df_lag_unpart[diffs_col1"])断言 mismatched.count() == 0

My question is triggered by the use case of calculating the differences between consecutive rows in a spark dataframe.

For example, I have:

>>> df.show()
+-----+----------+
|index|      col1|
+-----+----------+
|  0.0|0.58734024|
|  1.0|0.67304325|
|  2.0|0.85154736|
|  3.0| 0.5449719|
+-----+----------+

If I choose to calculate these using "Window" functions, then I can do that like so:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index|      col1| diffs_col1|
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|
|  2.0|0.85154736|-0.30657548|
|  3.0| 0.5449719|       null|
+-----+----------+-----------+

Question: I explicitly partitioned the dataframe in a single partition. What is the performance impact of this and, if there is, why is that so and how could I avoid it? Because when I do not specify a partition, I get the following warning:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

解决方案

In practice performance impact will be almost the same as if you omitted partitionBy clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one.

The difference is only in the number of partitions created in total. Let's illustrate that with an example using simple dataset with 10 partitions and 1000 records:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

If you define frame without partition by clause

w_unpart = Window.orderBy(f.col("index").asc())

and use it with lag

df_lag_unpart = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)

there will be only one partition in total:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

Compared to that frame definition with dummy index (simplified a bit compared to your code:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

will use number of partitions equal to spark.sql.shuffle.partitions:

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)

df_lag_part.rdd.glom().count()

11

with only one non-empty partition:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

Unfortunately there is no universal solution which can be used to address this problem in PySpark. This just an inherent mechanism of the implementation combined with distributed processing model.

Since index column is sequential you could generate artificial partitioning key with fixed number of records per block:

rec_per_block  = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(
    "block", (f.col("index") / rec_per_block).cast("int")
)

and use it to define frame specification:

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)

This will use expected number of partitions:

df_lag_with_block.rdd.glom().count()

11

with roughly uniform data distribution (we cannot avoid hash collisions):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

but with a number of gaps on the block boundaries:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

Since boundaries are easy to compute:

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(
    # Here we depend on sequential identifiers
    # This could be generalized to any monotonically increasing
    # id by taking min and max per block
    (idx - 1, idx) for idx in 
    df_lag_with_block.groupBy("block").min("index")
        .drop("block").rdd.flatMap(lambda x: x)
        .collect()))[2:]  # The first boundary doesn't carry useful inf.

you can always select:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

and fill these separately:

# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))

and join:

combined = (df_lag_with_block
    .join(missing_with_lag, ["index"], "leftouter")
    .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

to get desired result:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
    combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0

这篇关于避免 Spark 窗口函数中单一分区模式的性能影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆