如何在Apache Beam中写入多个文件? [英] How do I write to multiple files in Apache Beam?

查看:160
本文介绍了如何在Apache Beam中写入多个文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我简化我的案子.我正在使用Apache Beam 0.6.0.我最终的处理结果是PCollection<KV<String, String>>.而且我想将值写入对应于其键的不同文件中.

Let me simplify my case. I'm using Apache Beam 0.6.0. My final processed result is PCollection<KV<String, String>>. And I want to write values to different files corresponding to their keys.

例如,假设结果包含

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

然后我想将value1value3value4写入key1.txt,并将value4写入key2.txt.

Then I want to write value1, value3 and value4 to key1.txt, and write value4 to key2.txt.

在我的情况下:

  • 键集是在管道运行时确定的,而不是在构造管道时确定的.
  • 键集可能很小,但是与每个键对应的值的数量可能非常大.

有什么想法吗?

推荐答案

很方便,前几天,我写了这个案例的样本.

Handily, I wrote a sample of this case just the other day.

此示例为数据流1.x样式

基本上,您可以对每个键进行分组,然后可以通过连接到云存储的自定义转换来执行此操作.需要说明的是,每个文件的行数列表不应该太大(它必须适合单个实例的内存,但是考虑到您可以运行高内存实例,因此该限制相当高).

Basically you group by each key, and then you can do this with a custom transform that connects to cloud storage. Caveat being that your list of lines per-file shouldn't be massive (it's got to fit into memory on a single instance, but considering you can run high-mem instances, that limit is pretty high).

    ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
                .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
        readyToWrite.apply(
                new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

然后完成大部分工作的转换是:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
        this.bucketName = bucketName;
        this.pathCreator = pathCreator;
    }

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

        return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                    throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                        .setContentType(MimeTypes.TEXT)
                        .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                        toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
    }
}

这篇关于如何在Apache Beam中写入多个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆