使用元素计数通过Dataflow写入GCS [英] Writing to GCS with Dataflow using element count
问题描述
这是对Apache Beam SDK版本2.2.0的引用.
This is in reference to Apache Beam SDK Version 2.2.0.
我正在尝试使用AfterPane.elementCountAtLeast(...)
,但到目前为止没有任何成功.我想要的内容很像写作使用DoFn 使用Cloud Dataflow从PubSub到Google Cloud Storage,但需要调整为2.2.0.最终,我只需要一个简单的OR,即在X个元素或Y时间过去之后写入文件.我打算将时间设置得很高,以便在大多数情况下写操作发生在元素数量上,并且仅在消息量非常低的时候才根据持续时间进行写操作.
I'm attempting to use AfterPane.elementCountAtLeast(...)
but not having any success so far. What I want looks a lot like Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn, but needs to be adapted to 2.2.0. Ultimately I just need a simple OR where a file is written after X elements OR Y time has passed. I intend to set the time very high so that the write happens on the number of elements in the majority of cases, and only writes based on duration during times of very low message volume.
使用 GCP Dataflow 2.0 PubSub到GCS 作为参考,这是我尝试过的:
Using GCP Dataflow 2.0 PubSub to GCS as a reference here's what I've tried:
String bucketPath =
String.format("gs://%s/%s",
options.getBucketName(),
options.getDestinationDirName());
PCollection<String> windowedValues = stringMessages
.apply("Create windows",
Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
.discardingFiredPanes());
windowedValues
.apply("Write to GCS",
TextIO
.write()
.to(bucketPath)
.withNumShards(options.getNumShards())
.withWindowedWrites());
其中stringMessages
是从Avro编码的pubsub订阅读取的PCollection.上游发生了一些拆包操作,以将事件转换为字符串,但没有合并/分区/分组,只需进行转换即可.
Where stringMessages
is a PCollection that is reading from an Avro-encoded pubsub subscription. There is some unpacking happening upstream to get the events converted to strings, but no merging/partitioning/grouping, just transforms.
仅针对PoC,元素计数被硬编码为250.一旦被证明,它可能会增加到十万或十万的范围.
Element count is hard coded at 250 just for PoC. Once it is proven, it will likely be cranked up to the 10s or 100s of thousands range.
问题
此实现产生了各种长度的文本文件.作业首次启动时(可能是处理积压的数据,然后在某个时候稳定下来),文件长度开始非常长(1000个元素).我尝试将'numShards'更改为1和10.在1,元素计数的书面文件数量稳定在600,而10个文件数量稳定在300.
This implementation has resulted in text files of various lengths. The files lengths start very high (1000s of elements) when the job first starts up (presumably processing backlogged data, and then stabilize at some point. I've tried altering the 'numShards' to 1 and 10. At 1, the element count of the written files stabilizes at 600, and with 10, it stabilizes at 300.
我在这里想念什么?
请注意,这只是第1步.一旦我弄清楚了使用 元素计数,我仍然需要弄清楚将这些文件写为 压缩的json(.json.gz),而不是纯文本文件.
As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.
推荐答案
将我学到的内容发布给他人以供参考.
Posting what I learned for reference by others.
当我写这篇文章时,我不清楚的是来自 Apache的以下内容梁文档:
What was not clear to me when I wrote this is the following from the Apache Beam Documentation:
将多个元素(例如
GroupByKey
和Combine
,在每个窗口的基础上隐式工作
Transforms that aggregate multiple elements, such as
GroupByKey
andCombine
, work implicitly on a per-window basis
基于此知识,我对管道进行了重新思考.来自 FileIO文档在 Writing files->每个窗格生成多少个分片:
With this knowledge, I rethought my pipeline a bit. From the FileIO documentation under Writing files -> How many shards are generated per pane:
请注意,设置固定数量的分片会损害性能:它会在管道中添加额外的
GroupByKey
.但是,由于BEAM-1438和其他运行程序中的类似行为,在写入无界PCollection
时需要设置它.
Note that setting a fixed number of shards can hurt performance: it adds an additional
GroupByKey
to the pipeline. However, it is required to set it when writing an unboundedPCollection
due to BEAM-1438 and similar behavior in other runners.
因此,我决定使用FileIO
的writeDynamic
进行写操作并指定withNumShards
以获得隐式的GroupByKey
.最终结果如下所示:
So I decided to use FileIO
's writeDynamic
to perform the writes and specify withNumShards
in order to get the implicit GroupByKey
. The final result looks like this:
PCollection<String> windowedValues = validMessageStream.apply(Window
.<String>configure()
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(2000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(windowDurationSeconds)))))
.discardingFiredPanes());
windowedValues.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://data_pipeline_events_test/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
这篇关于使用元素计数通过Dataflow写入GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!