在唯一键上连接DataFrame时如何避免乱序? [英] How to avoid shuffles while joining DataFrames on unique keys?

查看:174
本文介绍了在唯一键上连接DataFrame时如何避免乱序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个DataFrames AB:

I have two DataFrames A and B:

  • A(id, info1, info2)列包含约2亿行
  • B仅具有包含100万行的id
  • A has columns (id, info1, info2) with about 200 Million rows
  • B only has the column id with 1 million rows

id列在两个DataFrame中都是唯一的.

The id column is unique in both DataFrames.

我想要一个新的DataFrame,它过滤A仅包含来自B的值.

I want a new DataFrame which filters A to only include values from B.

如果B很小,我知道我会照做

if B was very small I know I would something along the lines of

A.filter($("id") isin B("id"))

但是B仍然很大,因此并不是所有的变量都可以作为广播变量.

but B is still pretty large, so not all of it can fit as a broadcast variable.

我知道我可以使用

A.join(B, Seq("id"))

但是那不能利用唯一性,恐怕会引起不必要的洗牌.

but that wouldn't harness the uniqueness and I'm afraid will cause unnecessary shuffles.

完成该任务的最佳方法是什么?

What is the optimal method to achieve that task?

推荐答案

如果尚未在Dataframe A上应用任何分区程序,则可能会帮助您理解Join And Shuffle概念.

If you have not applied any partitioner on Dataframe A, May be this will help you understanding Join And Shuffle concepts.

没有分区程序:

A.join(B, Seq("id"))

默认情况下,此操作将对两个数据帧的所有密钥进行哈希处理,将具有相同密钥哈希值的元素通过网络发送到同一台计算机,然后在该计算机上将具有相同密钥的元素结合在一起.在这里,您必须注意,两个数据帧都在网络上随机播放.

By default, this operation will hash all the keys of both dataframes, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Here you have to notice that both dataframes shuffle across the network.

使用HashPartitioner: 在构建数据帧时调用partitionBy(),Spark现在将知道它是哈希分区的,并且对其调用join()将利用此信息.特别是,当我们调用A.join(B,Seq("id"))时,Spark将仅对B RDD进行洗牌.由于B的数据少于A的数据,因此您无需在B上应用分区程序

With HashPartitioner: Call partitionBy() when building A Dataframe, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call A.join(B, Seq("id")), Spark will shuffle only the B RDD. Since B has less data than A you don't need to apply partitioner on B

例如:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))

参考资料来自《学习Spark》一书.

Reference is from Learning Spark book.

这篇关于在唯一键上连接DataFrame时如何避免乱序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆