使用Apache Beam将流数据写入GCS [英] Write streaming data to GCS using Apache Beam

查看:72
本文介绍了使用Apache Beam将流数据写入GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用Apache Beam中的TextIO将从PubSub接收到的消息写到GCS中的文本文件中?看到了诸如withWindowedWrites()和withFilenamePolicy()之类的方法,但在文档中找不到任何示例.

How to write messages received from PubSub to a text file in GCS using TextIO in Apache Beam? Saw some methods like withWindowedWrites() and withFilenamePolicy() but couldn't find any example of it in the documentation.

推荐答案

如果您使用的是Java SDK(BEAM 2.1.0),这里是一个示例.

Here is an example provided you are using the Java SDK (BEAM 2.1.0).

PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                                                    .withValidation()
                                                    .as(PipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);

pipeline.begin()
               .apply("PubsubIO",PubsubIO.readStrings()
                     .withTimestampAttribute("timestamp")
                     .fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION"))
               .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L))))
               .apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites());

通过浏览TextIO.Write.expand(PCollection输入)中的"expand"方法,可以看到SDK用于文件命名的默认值.具体来说,我来看看DefaultFilenamePolicy.java

You can see the defaults that the SDK uses for the file naming by exploring the "expand" method in TextIO.Write.expand(PCollection input). Specifically I'd take a look at DefaultFilenamePolicy.java

这篇关于使用Apache Beam将流数据写入GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆