使用defaultNaming在Apache Beam中进行动态窗口写入 [英] Using defaultNaming for dynamic windowed writes in Apache Beam

查看:84
本文介绍了使用defaultNaming在Apache Beam中进行动态窗口写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注这篇文章文档,以便在管道末尾对我的数据执行动态窗口写入。这是我到目前为止的内容:

I am following along with answer to this post and the documentation in order to perform a dynamic windowed write on my data at the end of a pipeline. Here is what I have so far:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

但是NetBeans在最后一行警告我语法错误:

But NetBeans warns me about a syntax error on the last line:

FileNaming在Write中不公开;无法在程序包外部访问

如何使 defaultNaming 对我的管道可用,因此我可以将其用于动态写入。或者,如果不可能的话,我该怎么做呢?

How do I make defaultNaming available to my pipeline so that I can use it for dynamic writes. Or, if that isn't possible, what should I be doing instead?

推荐答案

如果有人发现,我会发布我的想法

Posting what I figured out in case someone else comes across this.

以前我尝试使用 writeDynamic()的方式存在三个问题。

There were three issues with how I was attempting to use writeDynamic() before.


  1. 以前我一直在使用Beam版本2.3.0,它确实将 FileNaming 描述为 FileIO.Write 内部的类。 Beam 2.4.0将 FileNaming 定义为公共静态接口,使其可以在外部使用。

  2. 完全解析/导入 defaultNaming 。而不是直接调用 defaultNaming (在示例文档中已被调用),它必须以 FileIO.Write.defaultNaming 因为 FileIO 是我实际上导入的软件包。

  3. 添加 withDestinationCoder 是还需要执行动态写入。

  1. Previously I had been using Beam version 2.3.0, which does indeed describe FileNaming as a class internal to FileIO.Write. Beam 2.4.0 defines FileNaming as a public static interface making it available externally.
  2. Fully resolving/importing defaultNaming. Rather than calling defaultNaming directly - as it is called in the example documentation - it must be invoked as FileIO.Write.defaultNaming since FileIO is the package I actually imported.
  3. Adding withDestinationCoder was also required to perform the dynamic write.

最终解决方案最终看起来像这样。

The final solution ended up looking like this.

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

其中 Event :: getKey 是在同一包中定义的静态函数,其签名为 public static String getKey(String event)

Where Event::getKey is a static function defined within the same package with the signature public static String getKey(String event).

这将执行窗口写入,每个窗口将写入一个文件(由 .withNumShards(1)方法定义)。假定窗口已在上一步中定义。在写入之前不需要 GroupByKey ,因为只要明确定义了分片的数量,它就在写入过程中完成。请参见 FileIO文档有关写文件->每个窗格生成多少个碎片下的更多详细信息。

This performs a windowed write which will write one file per window (as defined by the .withNumShards(1) method). This assumes the window has been defined in a previous step. A GroupByKey is not required prior to writing since it is done in the process of writing whenever the number of shards is defined explicitly. See the FileIO documentation for more details under "Writing files -> How many shards are generated per pane".

这篇关于使用defaultNaming在Apache Beam中进行动态窗口写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆