使用 Apache Beam 将流数据写入 GCS [英] Write streaming data to GCS using Apache Beam

查看:37
本文介绍了使用 Apache Beam 将流数据写入 GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用 Apache Beam 中的 TextIO 将来自 PubSub 的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例.

How to write messages received from PubSub to a text file in GCS using TextIO in Apache Beam? Saw some methods like withWindowedWrites() and withFilenamePolicy() but couldn't find any example of it in the documentation.

推荐答案

这是一个示例,前提是您使用的是 Java SDK (BEAM 2.1.0).

Here is an example provided you are using the Java SDK (BEAM 2.1.0).

PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                                                    .withValidation()
                                                    .as(PipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);

pipeline.begin()
               .apply("PubsubIO",PubsubIO.readStrings()
                     .withTimestampAttribute("timestamp")
                     .fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION"))
               .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L))))
               .apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites());

您可以通过探索 TextIO.Write.expand(PCollection input) 中的expand"方法来查看 SDK 用于文件命名的默认值.具体我会看看 DefaultFilenamePolicy.java

You can see the defaults that the SDK uses for the file naming by exploring the "expand" method in TextIO.Write.expand(PCollection input). Specifically I'd take a look at DefaultFilenamePolicy.java

这篇关于使用 Apache Beam 将流数据写入 GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆