使用元素计数使用 Dataflow 写入 GCS [英] Writing to GCS with Dataflow using element count

查看:22
本文介绍了使用元素计数使用 Dataflow 写入 GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是参考 Apache Beam SDK 版本 2.2.0.

This is in reference to Apache Beam SDK Version 2.2.0.

我正在尝试使用 AfterPane.elementCountAtLeast(...) 但到目前为止没有任何成功.我想要的看起来很像 Writing使用 Cloud Dataflow 使用 DoFn 从 PubSub 到 Google Cloud Storage,但需要适应 2.2.0.最终,我只需要一个简单的 OR,在 X 元素或 Y 时间过去后写入文件.我打算将时间设置得非常高,以便在大多数情况下根据元素数量进行写入,并且仅在消息量非常低的情况下根据持续时间进行写入.

I'm attempting to use AfterPane.elementCountAtLeast(...) but not having any success so far. What I want looks a lot like Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn, but needs to be adapted to 2.2.0. Ultimately I just need a simple OR where a file is written after X elements OR Y time has passed. I intend to set the time very high so that the write happens on the number of elements in the majority of cases, and only writes based on duration during times of very low message volume.

使用 GCP Dataflow 2.0 PubSub 到 GCS作为参考,这是我尝试过的:

Using GCP Dataflow 2.0 PubSub to GCS as a reference here's what I've tried:

String bucketPath =
    String.format("gs://%s/%s", 
        options.getBucketName(), 
        options.getDestinationDirName());

PCollection<String> windowedValues = stringMessages
    .apply("Create windows",
        Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
        .discardingFiredPanes());

windowedValues
    .apply("Write to GCS",
        TextIO
            .write()
            .to(bucketPath)
            .withNumShards(options.getNumShards())
            .withWindowedWrites());

其中 stringMessages 是从 Avro 编码的 pubsub 订阅读取的 PCollection.上游发生了一些解包以将事件转换为字符串,但没有合并/分区/分组,只是转换.

Where stringMessages is a PCollection that is reading from an Avro-encoded pubsub subscription. There is some unpacking happening upstream to get the events converted to strings, but no merging/partitioning/grouping, just transforms.

元素数量硬编码为 250,仅用于 PoC.一旦得到证实,它很可能会上升到 10 或 100 万的范围.

Element count is hard coded at 250 just for PoC. Once it is proven, it will likely be cranked up to the 10s or 100s of thousands range.

问题

这个实现产生了各种长度的文本文件.当作业首次启动时,文件长度非常高(1000 个元素)(大概是处理积压数据,然后在某个时候稳定下来.我尝试将numShards"更改为 1 和 10.在 1 时,元素计数写入的文件稳定在600个,10个稳定在300个.

This implementation has resulted in text files of various lengths. The files lengths start very high (1000s of elements) when the job first starts up (presumably processing backlogged data, and then stabilize at some point. I've tried altering the 'numShards' to 1 and 10. At 1, the element count of the written files stabilizes at 600, and with 10, it stabilizes at 300.

我在这里遗漏了什么?

作为旁注,这只是第 1 步.一旦我弄清楚使用元素计数,我仍然需要弄清楚将这些文件写为压缩 json (.json.gz) 而不是纯文本文件.

As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.

推荐答案

把我学到的东西贴出来供其他人参考.

Posting what I learned for reference by others.

当我写这篇文章时,我不清楚的是 Apache光束文档:

What was not clear to me when I wrote this is the following from the Apache Beam Documentation:

聚合多个元素的转换,例如 GroupByKeyCombine,在每个窗口的基础上隐式工作

Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis

有了这些知识,我重新考虑了我的管道.来自 FileIO 文档Writing files -> 每个窗格生成多少个分片:

With this knowledge, I rethought my pipeline a bit. From the FileIO documentation under Writing files -> How many shards are generated per pane:

请注意,设置固定数量的分片会影响性能:它会向管道添加额外的 GroupByKey.但是,由于 BEAM-1438 和其他运行程序中的类似行为,在编写无界 PCollection 时需要设置它.

Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey to the pipeline. However, it is required to set it when writing an unbounded PCollection due to BEAM-1438 and similar behavior in other runners.

所以我决定使用 FileIOwriteDynamic 来执行写入并指定 withNumShards 以获得隐式 GroupByKey.最终结果如下:

So I decided to use FileIO's writeDynamic to perform the writes and specify withNumShards in order to get the implicit GroupByKey. The final result looks like this:

PCollection<String> windowedValues = validMessageStream.apply(Window
            .<String>configure()
            .triggering(Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(2000),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                            Duration.standardSeconds(windowDurationSeconds)))))
            .discardingFiredPanes());

windowedValues.apply(FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://data_pipeline_events_test/events/")
            .withDestinationCoder(StringUtf8Coder.of())
            .withNumShards(1)
            .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));

这篇关于使用元素计数使用 Dataflow 写入 GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆