高效的pyspark连接 [英] Efficient pyspark join

查看:23
本文介绍了高效的pyspark连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经阅读了很多关于如何在 pyspark 中进行高效连接的文章.我发现的实现高效连接的方法基本上是:

I've read a lot about how to do efficient joins in pyspark. The ways to achieve efficient joins I've found are basically:

  • 如果可以,请使用广播加入.(我通常不能因为数据框太大)
  • 考虑使用非常大的集群.(我宁愿不要因为$$$).
  • 使用相同的分区器.
  • Use a broadcast join if you can. (I usually can't because the dataframes are too large)
  • Consider using a very large cluster. (I'd rather not because of $$$).
  • Use the same partitioner.

最后一个是我宁愿尝试的,但我在 pyspark 中找不到方法.我试过了:

The last one is the one i'd rather try, but I can't find a way to do it in pyspark. I've tried:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

但这无济于事,直到我停止它仍然需要很长时间,因为 spark get 卡在最后几项工作中.

but it doesn't help, it still takes way too long until I stop it, because spark get's stucked in the last few jobs.

那么,我如何在 pyspark 中使用相同的分区器并加速我的连接,甚至摆脱永远需要的洗牌?我需要使用哪个代码?

So, how can I use the same partitioner in pyspark and speed up my joins, or even get rid of the shuffles that takes forever ? Which code do I need to use ?

PD:我查看了其他文章,甚至关于 stackoverflow,但我还是看不到代码.

PD: I've checked other articles, even on stackoverflow, but I still can't see code.

推荐答案

你也可以使用两遍的方法,如果它适合你的要求.首先,重新分区数据并使用分区表(dataframe.write.partitionBy()).然后,在循环中串行连接子分区,附加"到同一个最终结果表.Sim很好地解释了这一点.见下方链接

you can also use a two-pass approach, in case it suits your requirement.First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table. It was nicely explained by Sim. see link below

两步加入方法pyspark 中的大数据帧

根据上面解释的案例,我能够在循环中串行连接子分区,然后将连接的数据持久化到 hive 表中.

based on case explained above I was able to join sub-partitions serially in a loop and then persisting joined data to hive table.

这是代码.

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

因此,如果您加入一个整数 emp_id,您可以通过 ID 取模某个数字进行分区,这样您就可以在 spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一分区上.然后,您可以读取并循环遍历每个子分区数据并将两个数据帧连接起来并将它们持久化在一起.

So, if you are joining on an integer emp_id, you can partition by the ID modulo some number and this way you can re distribute the load across the spark partitions and records having similar keys will be grouped together and reside on same partition. you can then read and loop through each sub partition data and join both the dataframes and persist them together.

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

我已经试过了,这个工作正常.这只是演示两遍方法的示例.您的加入条件可能会有所不同,分区数量也取决于您的数据大小.

I have tried this and this is working fine. This is just an example to demo the two-pass approach. your join conditions may vary and the number of partitions also depending on your data size.

这篇关于高效的pyspark连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆