如何将多个PCollections组合在一起并将其作为ParDo函数的输入 [英] How to combine multiple PCollections together and give it as input to a ParDo function

查看:677
本文介绍了如何将多个PCollections组合在一起并将其作为ParDo函数的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有六个PCollections作为KV。我想通过组合(6)PCollections作为sideInput来在另一个PCollection上执行ParDo。



我试着将所有6个PCollections分别配​​置为sideInput,如下所示:

  PCollection<&的TableRow GT; OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1,Inp2,...)
.of(new DoFn< KV< String,String>,TableRow>(){
...
}

但是它抛出的OutOfMemoryError堆空间超过了,请教如何将PCollections合并到作为另一个PCollection的输入。

解决方案

Cloud Dataflow提供了多种加入方式。



用作侧面输入的PCollection 被广播给工作者并加载到内存中。这听起来像你正在做的事情,并且会解释OOM if PCollection 尺寸的总和太大。



您提到这些值是键控的 - 另一种选择是要使用 CoGroupByKey



为此,您可以创建一个 KeyedPColle ctionTuple 与你所有的 PCollection s,那么你将得到一个结果,其中包含每个键的所有值。使用像这样的 CoGroupByKey 会将您的数据混洗,这样,使用给定键的结果的ParDo只需要读取关联的值:

  PCollection > inp1 = ...; 
PCollection< KV< K,V2>> inp2 = ...;

final TupleTag< V1> t1 = new TupleTag<>();
final TupleTag< V2> t2 =新的TupleTag<>();
PCollection< KV< K,CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(t1,inp1)
。和(t2,inp2)
.apply(CoGroupByKey。< K> create());

PCollection< T> finalResultCollection =
coGbkResultCollection.apply(ParDo.of(
new DoFn (){
@Override
public void processElement(ProcessContext c) {
KV e = c.element();
Iterable< V1> pt1Vals = e.getValue()。getAll(t1);
V2 pt2Val = e.getValue ().getOnly(t2);
...做某事....
c.output(...某些T ...);
}
}) );


I have some six PCollections as KV. I want to do ParDo on another PCollection by giving the combined (6) PCollections as sideInput.

I tried giving all 6 PCollections as separate sideInput as below

PCollection<TableRow> OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1, Inp2,...)
    .of(new DoFn<KV<String, String>, TableRow>() {
        ...
    }

But its throwing OutOfMemoryError as the heap space exceeds. Please advice on how to combine the PCollections to give as input to another PCollection.

解决方案

Cloud Dataflow provides several ways of joining.

PCollections used as a side-input are broadcast to the worker and loaded into memory. This sounds like what you're doing and would explain the OOM if the sum of the PCollection sizes is too big.

You mentioned that the values are keyed -- another option is to use a CoGroupByKey.

To do this, you would create a KeyedPCollectionTuple with all of your PCollections, then you would get a result which had all the values for each key. Using a CoGroupByKey like this will shuffle your data around so that the ParDo that consumes the result for a given key will only need to read in the associated values:

PCollection<KV<K, V1>> inp1 = ...;
PCollection<KV<K, V2>> inp2 = ...;

final  TupleTag<V1> t1 = new  TupleTag<>();
final  TupleTag<V2> t2 = new  TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
  KeyedPCollectionTuple.of(t1, inp1)
                       .and(t2, inp2)
                       .apply(CoGroupByKey.<K>create());

PCollection<T> finalResultCollection =
  coGbkResultCollection.apply(ParDo.of(
   new  DoFn<KV<K, CoGbkResult>, T>() {
     @Override
     public void processElement(ProcessContext c) {
      KV<K, CoGbkResult> e = c.element();
      Iterable<V1> pt1Vals = e.getValue().getAll(t1);
      V2 pt2Val = e.getValue().getOnly(t2);
      ... Do Something ....
     c.output(...some T...);
   }
 }));

这篇关于如何将多个PCollections组合在一起并将其作为ParDo函数的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆