我怎样才能有效地加入一个大的RDD在火花非常大的RDD? [英] How can I efficiently join a large rdd to a very large rdd in spark?

查看:233
本文介绍了我怎样才能有效地加入一个大的RDD在火花非常大的RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个RDDS。一个RDD是项5-10万元之间,而另一个RDD是500万之间 - 条目7.5亿。在某些时候,我有使用公共密钥加入这两个RDDS。

I have two RDDs. One RDD is between 5-10 million entries and the other RDD is between 500 million - 750 million entries. At some point, I have to join these two rdds using a common key.

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);

当火花决定做这个连接,它决定做一个ShuffledHashJoin。这会导致许多RDDB的项目要在网络上混洗。同样地,一些RDDA也都混洗在网络上。在这种情况下,RDDA太大作为广播变量来使用,但似乎是一个BroadcastHashJoin会更有效。是否有暗示激发使用BroadcastHashJoin? (Apache的弗林克支持这种通过联接提示)。

When spark decides to do this join, it decides to do a ShuffledHashJoin. This causes many of the items in rddB to be shuffled on the network. Likewise, some of rddA are also shuffled on the network. In this case, rddA is too "big" to use as a broadcast variable, but seems like a BroadcastHashJoin would be more efficient. Is there to hint to spark to use a BroadcastHashJoin? (Apache Flink supports this through join hints).

如果没有,就是增加autoBroadcastJoinThreshold唯一的选择?

If not, is the only option to increase the autoBroadcastJoinThreshold?

更新7/14

我的性能问题似乎正视植根于重新分区。通常情况下,从HDFS读取的RDD将由块进行分区,但在这种情况下,源是镶数据源[我制成。当火花(databricks)写入拼花文件,将其写入每个分区一个文件,相同的,它会读取每个文件一个分区。所以,最好的答案我发现的是,生产数据源的过程中,通过键,然后对它进行分区,写出来的实木复合地板下沉(这是那么自然共同分区),并把它作为RDDB。

My performance issue appears to be squarely rooted in repartitioning. Normally, an RDD read from HDFS would be partitioned by block, but in this case, the source was a parquet datasource [that I made]. When spark (databricks) writes the parquet file, it writes one file per partition, and identically, it reads one partition per file. So, the best answer I've found is that during production of the datasource, to partition it by key then, write out the parquet sink (which is then naturally co-partitioned) and use that as rddB.

给出的答案是正确的,但我想拼花数据源的细节可能对别人有用的。

The answer given is correct, but I think the details about parquet datasource may be useful to someone else.

推荐答案

您可以分区RDD的与同一分区,在使用相同的密钥这种情况下,分区将在同一个执行者置。

You can partition RDD's with the same partitioner, in this case partitions with the same key will be collocated on the same executor.

在这种情况下,你会避免洗牌的连接操作。

In this case you will avoid shuffle for join operations.

shuffle将只发生一次,当你更新parititoner,如果你缓存RDD的所有连接之后应该是本地的执行者

Shuffle will happen only once, when you'll update parititoner, and if you'll cache RDD's all joins after that should be local to executors

import org.apache.spark.SparkContext._

class A
class B

val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???

val partitioner = new HashPartitioner(1000)

rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()

您也可以尝试更新广播阈值大小,也许可以RDDA广播:

Also you can try to update broadcast threshold size, maybe rddA can broadcasted:

--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb

我们使用400MB广播连接,并且效果很好。

We use 400mb for broadcast joins, and it works well.

这篇关于我怎样才能有效地加入一个大的RDD在火花非常大的RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆