如何将多个PCollections组合在一起并将其作为ParDo函数的输入 [英] How to combine multiple PCollections together and give it as input to a ParDo function
问题描述
我试着将所有6个PCollections分别配置为sideInput,如下所示:
PCollection<&的TableRow GT; OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1,Inp2,...)
.of(new DoFn< KV< String,String>,TableRow>(){
...
}
但是它抛出的OutOfMemoryError堆空间超过了,请教如何将PCollections合并到作为另一个PCollection的输入。
Cloud Dataflow提供了多种加入方式。
用作侧面输入的PCollection
被广播给工作者并加载到内存中。这听起来像你正在做的事情,并且会解释OOM if PCollection
尺寸的总和太大。
您提到这些值是键控的 - 另一种选择是要使用 CoGroupByKey 。
为此,您可以创建一个 KeyedPColle ctionTuple
与你所有的 PCollection
s,那么你将得到一个结果,其中包含每个键的所有值。使用像这样的 CoGroupByKey
会将您的数据混洗,这样,使用给定键的结果的ParDo只需要读取关联的值:
PCollection > inp1 = ...;
PCollection< KV< K,V2>> inp2 = ...;
final TupleTag< V1> t1 = new TupleTag<>();
final TupleTag< V2> t2 =新的TupleTag<>();
PCollection< KV< K,CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(t1,inp1)
。和(t2,inp2)
.apply(CoGroupByKey。< K> create());
PCollection< T> finalResultCollection =
coGbkResultCollection.apply(ParDo.of(
new DoFn (){
@Override
public void processElement(ProcessContext c) {
KV e = c.element();
Iterable< V1> pt1Vals = e.getValue()。getAll(t1);
V2 pt2Val = e.getValue ().getOnly(t2);
...做某事....
c.output(...某些T ...);
}
}) );
I have some six PCollections as KV. I want to do ParDo on another PCollection by giving the combined (6) PCollections as sideInput.
I tried giving all 6 PCollections as separate sideInput as below
PCollection<TableRow> OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1, Inp2,...)
.of(new DoFn<KV<String, String>, TableRow>() {
...
}
But its throwing OutOfMemoryError as the heap space exceeds. Please advice on how to combine the PCollections to give as input to another PCollection.
Cloud Dataflow provides several ways of joining.
PCollection
s used as a side-input are broadcast to the worker and loaded into memory. This sounds like what you're doing and would explain the OOM if the sum of the PCollection
sizes is too big.
You mentioned that the values are keyed -- another option is to use a CoGroupByKey.
To do this, you would create a KeyedPCollectionTuple
with all of your PCollection
s, then you would get a result which had all the values for each key. Using a CoGroupByKey
like this will shuffle your data around so that the ParDo that consumes the result for a given key will only need to read in the associated values:
PCollection<KV<K, V1>> inp1 = ...;
PCollection<KV<K, V2>> inp2 = ...;
final TupleTag<V1> t1 = new TupleTag<>();
final TupleTag<V2> t2 = new TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(t1, inp1)
.and(t2, inp2)
.apply(CoGroupByKey.<K>create());
PCollection<T> finalResultCollection =
coGbkResultCollection.apply(ParDo.of(
new DoFn<KV<K, CoGbkResult>, T>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
Iterable<V1> pt1Vals = e.getValue().getAll(t1);
V2 pt2Val = e.getValue().getOnly(t2);
... Do Something ....
c.output(...some T...);
}
}));
这篇关于如何将多个PCollections组合在一起并将其作为ParDo函数的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!