使用元素计数通过Dataflow写入GCS [英] Writing to GCS with Dataflow using element count

查看:90
本文介绍了使用元素计数通过Dataflow写入GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是对Apache Beam SDK版本2.2.0的引用.

This is in reference to Apache Beam SDK Version 2.2.0.

我正在尝试使用AfterPane.elementCountAtLeast(...),但到目前为止没有任何成功.我想要的内容很像写作使用DoFn 使用Cloud Dataflow从PubSub到Google Cloud Storage,但需要调整为2.2.0.最终,我只需要一个简单的OR,即在X个元素或Y时间过去之后写入文件.我打算将时间设置得很高,以便在大多数情况下写操作发生在元素数量上,并且仅在消息量非常低的时候才根据持续时间进行写操作.

I'm attempting to use AfterPane.elementCountAtLeast(...) but not having any success so far. What I want looks a lot like Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn, but needs to be adapted to 2.2.0. Ultimately I just need a simple OR where a file is written after X elements OR Y time has passed. I intend to set the time very high so that the write happens on the number of elements in the majority of cases, and only writes based on duration during times of very low message volume.

使用 GCP Dataflow 2.0 PubSub到GCS 作为参考,这是我尝试过的:

Using GCP Dataflow 2.0 PubSub to GCS as a reference here's what I've tried:

String bucketPath =
    String.format("gs://%s/%s", 
        options.getBucketName(), 
        options.getDestinationDirName());

PCollection<String> windowedValues = stringMessages
    .apply("Create windows",
        Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
        .discardingFiredPanes());

windowedValues
    .apply("Write to GCS",
        TextIO
            .write()
            .to(bucketPath)
            .withNumShards(options.getNumShards())
            .withWindowedWrites());

其中stringMessages是从Avro编码的pubsub订阅读取的PCollection.上游发生了一些拆包操作,以将事件转换为字符串,但没有合并/分区/分组,只需进行转换即可.

Where stringMessages is a PCollection that is reading from an Avro-encoded pubsub subscription. There is some unpacking happening upstream to get the events converted to strings, but no merging/partitioning/grouping, just transforms.

仅针对PoC,元素计数被硬编码为250.一旦被证明,它可能会增加到十万或十万的范围.

Element count is hard coded at 250 just for PoC. Once it is proven, it will likely be cranked up to the 10s or 100s of thousands range.

问题

此实现产生了各种长度的文本文件.作业首次启动时(可能是处理积压的数据,然后在某个时候稳定下来),文件长度开始非常长(1000个元素).我尝试将'numShards'更改为1和10.在1,元素计数的书面文件数量稳定在600,而10个文件数量稳定在300.

This implementation has resulted in text files of various lengths. The files lengths start very high (1000s of elements) when the job first starts up (presumably processing backlogged data, and then stabilize at some point. I've tried altering the 'numShards' to 1 and 10. At 1, the element count of the written files stabilizes at 600, and with 10, it stabilizes at 300.

我在这里想念什么?

请注意,这只是第1步.一旦我弄清楚了使用 元素计数,我仍然需要弄清楚将这些文件写为 压缩的json(.json.gz),而不是纯文本文件.

As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.

推荐答案

将我学到的内容发布给他人以供参考.

Posting what I learned for reference by others.

当我写这篇文章时,我不清楚的是来自 Apache的以下内容梁文档:

What was not clear to me when I wrote this is the following from the Apache Beam Documentation:

将多个元素(例如GroupByKeyCombine,在每个窗口的基础上隐式工作

Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis

基于此知识,我对管道进行了重新思考.来自 FileIO文档 Writing files->每个窗格生成多少个分片:

With this knowledge, I rethought my pipeline a bit. From the FileIO documentation under Writing files -> How many shards are generated per pane:

请注意,设置固定数量的分片会损害性能:它会在管道中添加额外的GroupByKey.但是,由于BEAM-1438和其他运行程序中的类似行为,在写入无界PCollection时需要设置它.

Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey to the pipeline. However, it is required to set it when writing an unbounded PCollection due to BEAM-1438 and similar behavior in other runners.

因此,我决定使用FileIOwriteDynamic进行写操作并指定withNumShards以获得隐式的GroupByKey.最终结果如下所示:

So I decided to use FileIO's writeDynamic to perform the writes and specify withNumShards in order to get the implicit GroupByKey. The final result looks like this:

PCollection<String> windowedValues = validMessageStream.apply(Window
            .<String>configure()
            .triggering(Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(2000),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                            Duration.standardSeconds(windowDurationSeconds)))))
            .discardingFiredPanes());

windowedValues.apply(FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://data_pipeline_events_test/events/")
            .withDestinationCoder(StringUtf8Coder.of())
            .withNumShards(1)
            .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));

这篇关于使用元素计数通过Dataflow写入GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆