加入两个大体积的 PCollection 有性能问题 [英] Join two large volumne of PCollection has performance issue

查看:22
本文介绍了加入两个大体积的 PCollection 有性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行 8 多万条记录.从另一个 stackoverflow 帖子中注意到 CoGbkResult 有超过 10000 个元素,需要重复(可能很慢) 即CoGbkResult 有超过 10000 个元素,需要重复(可能很慢)."

Join two Pcollection with CoGroupsByKey approach taking hours to execute the 8+ millions records. Noted from another stackoverflow post CoGbkResult has more than 10000 elements,reiteration (which may be slow) is required that "CoGbkResult has more than 10000 elements, reiteration (which may be slow) is required."

使用这种方法提高性能的任何建议.

Any suggestion to improve this performance using this approach.

这是代码片段,

PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;

WithKeys<String, TableRow> withKeyValue = 
  WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
          .withKeyType(TypeDescriptors.strings());

PCollection<KV<String,TableRow>> keyed_pc1 =
  pc1.apply("WithKeys", withKeyValue );

PCollection<KV<String,TableRow>> keyed_pc2 = 
  pc2.apply("WithKeys", withKeyValue );

// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection = 
  Join.innerJoin(keyed_pc1, keyed_pc2); 

推荐答案

Apache Beam 规范没有定义联接的执行,并且除了 SDK 之外,没有更快的方式自己编写内部联接.因此,这个问题的答案取决于执行连接的内容,即哪个跑步者.我不知道 Flink 或 Spark 运行程序,所以这个答案将特定于 Dataflow 运行程序.

The Apache Beam specification doesn't define the execution of the join, and there is no faster way of writing inner joins yourself other than the SDK. Thus, the answer to this question depends on what is executing the join, i.e. which runner. I don't know the Flink or Spark runners, so this answer will be specific to the Dataflow runner.

如果您还没有,请查看关于此 主题.在博客文章中,它描述了可以手动启用的 Dataflow Shuffle 服务.此服务是比当前默认值更好的实现,并且通常会加快执行速度,尤其是对于连接.

If you haven't already, take a look at this blog post on this topic. In the blog post it describes the Dataflow Shuffle Service that can be manually enabled. This service is a better implementation than the current default and leads to much faster execution in general but especially for joins.

要启用 Dataflow Shuffle 服务,请传入以下 标志:

To enable Dataflow Shuffle Service, pass in the following flags:

--experiments=shuffle_mode=service
--region=<allowed region>

shuffle 允许的区域是:us-central1"、europe-west1"、europe-west4"、asia-northeast1".

Where the allowed regions for shuffle are: "us-central1", "europe-west1", "europe-west4", "asia-northeast1".

这篇关于加入两个大体积的 PCollection 有性能问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆