加入两个大容量的PCollection时有性能问题 [英] Join two large volumne of PCollection has performance issue

查看:79
本文介绍了加入两个大容量的PCollection时有性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用CoGroupsByKey方法加入两个Pcollection,需要花费几个小时来执行8百万条以上的记录.从另一个stackoverflow帖子

Join two Pcollection with CoGroupsByKey approach taking hours to execute the 8+ millions records. Noted from another stackoverflow post CoGbkResult has more than 10000 elements,reiteration (which may be slow) is required that "CoGbkResult has more than 10000 elements, reiteration (which may be slow) is required."

任何使用这种方法来改善这种性能的建议.

Any suggestion to improve this performance using this approach.

这是代码段,

PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;

WithKeys<String, TableRow> withKeyValue = 
  WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
          .withKeyType(TypeDescriptors.strings());

PCollection<KV<String,TableRow>> keyed_pc1 =
  pc1.apply("WithKeys", withKeyValue );

PCollection<KV<String,TableRow>> keyed_pc2 = 
  pc2.apply("WithKeys", withKeyValue );

// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection = 
  Join.innerJoin(keyed_pc1, keyed_pc2); 

推荐答案

Apache Beam规范没有定义联接的执行,除了SDK之外,没有比自己更快的编写内部联接的方法.因此,此问题的答案取决于执行联接的是哪个执行程序.我不知道Flink或Spark跑步者,所以这个答案将特定于Dataflow跑步者.

The Apache Beam specification doesn't define the execution of the join, and there is no faster way of writing inner joins yourself other than the SDK. Thus, the answer to this question depends on what is executing the join, i.e. which runner. I don't know the Flink or Spark runners, so this answer will be specific to the Dataflow runner.

(如果您还没有的话)在此主题.在博客文章中,它描述了可以手动启用的Dataflow Shuffle服务.与当前的默认设置相比,此服务是更好的实现,并且通常可以更快地执行,但对于联接尤其如此.

If you haven't already, take a look at this blog post on this topic. In the blog post it describes the Dataflow Shuffle Service that can be manually enabled. This service is a better implementation than the current default and leads to much faster execution in general but especially for joins.

要启用Dataflow Shuffle服务,请传递以下

To enable Dataflow Shuffle Service, pass in the following flags:

--experiments=shuffle_mode=service
--region=<allowed region>

允许随机播放的区域为:"us-central1","europe-west1","europe-west4","asia-northeast1".

Where the allowed regions for shuffle are: "us-central1", "europe-west1", "europe-west4", "asia-northeast1".

这篇关于加入两个大容量的PCollection时有性能问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆