有关在Spark中加入数据框的问题 [英] Question about joining dataframes in Spark

查看:59
本文介绍了有关在Spark中加入数据框的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有两个分区的数据帧:

Suppose I have two partitioned dataframes:

df1 = spark.createDataFrame(
    [(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

df2 = spark.createDataFrame(
    [(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

(方案1):如果我通过[key1,key2]加入它们,则在每个分区内执行合并操作而不会随机播放(结果数据帧中的分区数相同):

(scenario 1) If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same):

x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3

(场景2),但是如果我通过[key1,key2,时间]进行联合操作,则会发生随机操作(结果数据帧中的分区数为200,由spark.sql.shuffle驱动.分区选项):

(scenario 2) But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 200 which is driven by spark.sql.shuffle.partitions option):

x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200

同时,按[key1,key2,时间]进行groupby和window操作将保留分区数,并且无需进行随机播放即可完成

At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle:

x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3

我无法理解这是一个错误,还是出于某些原因在第二种情况下执行随机播放操作?而如果可能的话,我该如何避免洗牌呢?

I can’t understand is this a bug or there are some reasons for performing shuffle operation in second scenario? And how can I avoid shuffle if it's possible?

推荐答案

我想能够找出Python和Scala中不同结果的原因.

I guess was able to figure out the reason of different result in Python and Scala.

原因在于广播优化.如果在禁用广播的情况下启动spark-shell,则Python和Scala的工作原理相同.

The reason is in broadcast optimisation. If spark-shell is started with broadcast disabled both Python and Scala works identically.

./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1

val df1 = Seq(
  (1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val df2 = Seq(
  (1, 1, 1),
  (2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))

x.rdd.getNumPartitions == 200

因此,似乎spark 2.4.0无法如开箱即用地优化描述的案例,而@ user10938362所建议的催化剂优化程序扩展则无法实现.

So looks like spark 2.4.0 isn't able to optimise described case out of the box and catalyst optimizer extension needed as suggested by @user10938362.

顺便说一句.这是有关编写催化剂优化程序扩展的信息 https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/

BTW. Here are info about writing catalyst optimizer extensions https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/

这篇关于有关在Spark中加入数据框的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆