如何改组PCollection< T&gt ;? [英] How to reshuffle a PCollection<T>?

查看:33
本文介绍了如何改组PCollection< T&gt ;?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实施Reshuffle转换,以防止过度融合,但是我不知道如何更改<KV<String,String>>的版本以处理简单的PCollections. (PCollection <KV<String,String>>" >这里.)

I am trying to implement a Reshuffle transform to prevent excessive fusion, but I don't know how to alter the version for <KV<String,String>> to deal with simple PCollections. (How to reshuffle PCollection <KV<String,String>> is described here.)

我将如何扩展正式的Avro I/O 示例代码重新洗牌,然后在管道中添加更多步骤?

How would I expand the official Avro I/O example code to reshuffle before adding more steps in my pipeline?

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

PCollection<GenericRecord> records =
    p.apply(AvroIO.Read.named("ReadFromAvro")
        .from("gs://my_bucket/path/records-*.avro")
        .withSchema(schema));

推荐答案

感谢Google支持团队提供的代码段,我弄清楚了:

Thanks to the code snippet provided by the Google support team I figured it out:

要获得改组的PCollection:

To get a reshuffled PCollection:

PCollection<T> reshuffled = data.apply(Repartition.of());

使用的Repartition类:

The Repartition class used:

import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> {

    private Repartition() {}

    public static <T> Repartition<T> of() {
        return new Repartition<T>();
    }

    @Override
    public PCollection<T> apply(PCollection<T> input) {
        return input
                .apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>()))
                .apply(GroupByKey.<Integer, T>create())
                .apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>()));
    }

    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}

这篇关于如何改组PCollection&lt; T&gt ;?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆