高效的pyspark加入 [英] Efficient pyspark join

查看:75
本文介绍了高效的pyspark加入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经阅读了很多有关如何在pyspark中进行有效联接的内容.我发现,实现高效联接的方法基本上是:

I've read a lot about how to do efficient joins in pyspark. The ways to achieve efficient joins I've found are basically:

  • 如果可以,请使用广播加入. (我通常不能,因为数据框太大)
  • 考虑使用非常大的群集. (我宁愿不是因为 $$$ ).
  • 使用相同的分区程序.
  • Use a broadcast join if you can. (I usually can't because the dataframes are too large)
  • Consider using a very large cluster. (I'd rather not because of $$$).
  • Use the same partitioner.

最后一个是我想尝试的,但是我找不到在pyspark中做到这一点的方法.我尝试过:

The last one is the one i'd rather try, but I can't find a way to do it in pyspark. I've tried:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

但是它无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花.

but it doesn't help, it still takes way too long until I stop it, because spark get's stucked in the last few jobs.

那么,我如何才能在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永久使用的拖延呢?我需要使用哪个代码?

So, how can I use the same partitioner in pyspark and speed up my joins, or even get rid of the shuffles that takes forever ? Which code do I need to use ?

PD :即使在

PD: I've checked other articles, even on stackoverflow, but I still can't see code.

推荐答案

如果需要,您还可以使用两次通过方法.首先,对数据进行重新分区,并使用分区表(dataframe.write .partitionBy()).然后,将子分区依次循环连接,追加"到相同的最终结果表中. 辛很好地解释了这一点.参见下面的链接

you can also use a two-pass approach, in case it suits your requirement.First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table. It was nicely explained by Sim. see link below

两次通过方法来加入pyspark中的大数据框

基于上述情况,我能够将一个子分区串行地循环连接,然后将连接的数据持久保存到配置单元表中.

based on case explained above I was able to join sub-partitions serially in a loop and then persisting joined data to hive table.

这是代码.

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

因此,如果要加入整数emp_id,则可以按ID取模数进行分区,这样就可以将负载重新分布到spark分区中,并且具有相似键的记录将被分组在一起并驻留在同一分区上. 然后,您可以读取并循环遍历每个子分区数据,并合并两个数据框并将它们持久保存在一起.

So, if you are joining on an integer emp_id, you can partition by the ID modulo some number and this way you can re distribute the load across the spark partitions and records having similar keys will be grouped together and reside on same partition. you can then read and loop through each sub partition data and join both the dataframes and persist them together.

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

我已经尝试过了,并且工作正常.这只是演示两次通过方法的示例.您的加入条件可能会有所不同,分区数也取决于您的数据大小.

I have tried this and this is working fine. This is just an example to demo the two-pass approach. your join conditions may vary and the number of partitions also depending on your data size.

这篇关于高效的pyspark加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆