在 Apache Beam 中使用 defaultNaming 进行动态窗口写入 [英] Using defaultNaming for dynamic windowed writes in Apache Beam

查看:21
本文介绍了在 Apache Beam 中使用 defaultNaming 进行动态窗口写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注 这篇文章文档 以便在管道末端对我的数据执行动态窗口写入.这是我目前所拥有的:

I am following along with answer to this post and the documentation in order to perform a dynamic windowed write on my data at the end of a pipeline. Here is what I have so far:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

但是 NetBeans 在最后一行警告我语法错误:

But NetBeans warns me about a syntax error on the last line:

FileNaming 在 Write 中不公开;包外无法访问

如何使 defaultNaming 可用于我的管道,以便我可以将其用于动态写入.或者,如果这不可能,我应该怎么做?

How do I make defaultNaming available to my pipeline so that I can use it for dynamic writes. Or, if that isn't possible, what should I be doing instead?

推荐答案

发布我的想法,以防其他人遇到这个问题.

Posting what I figured out in case someone else comes across this.

我之前尝试使用 writeDynamic() 的方式存在三个问题.

There were three issues with how I was attempting to use writeDynamic() before.

  1. 以前我一直在使用 Beam 2.3.0 版,它确实将 FileNaming 描述为 FileIO.Write 的内部类.Beam 2.4.0 将 FileNaming 定义为一个 公共静态接口,使其在外部可用.
  2. 完全解析/导入defaultNaming.而不是直接调用 defaultNaming - 正如在示例文档中调用的那样 - 它必须作为 FileIO.Write.defaultNaming 调用,因为 FileIO 是我实际导入的包.
  3. 还需要添加 withDestinationCoder 来执行动态写入.
  1. Previously I had been using Beam version 2.3.0, which does indeed describe FileNaming as a class internal to FileIO.Write. Beam 2.4.0 defines FileNaming as a public static interface making it available externally.
  2. Fully resolving/importing defaultNaming. Rather than calling defaultNaming directly - as it is called in the example documentation - it must be invoked as FileIO.Write.defaultNaming since FileIO is the package I actually imported.
  3. Adding withDestinationCoder was also required to perform the dynamic write.

最终的解决方案看起来像这样.

The final solution ended up looking like this.

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

其中 Event::getKey 是在同一个包中定义的静态函数,其签名为 public static String getKey(String event).

Where Event::getKey is a static function defined within the same package with the signature public static String getKey(String event).

这将执行一个窗口写入,每个窗口写入一个文件(由 .withNumShards(1) 方法定义).这假设窗口已在上一步中定义.GroupByKey 在写入之前不是必需的,因为只要明确定义了分片数量,它就会在写入过程中完成.请参阅FileIO 文档 有关写入文件 -> 每个窗格生成多少个分片"下的更多详细信息.

This performs a windowed write which will write one file per window (as defined by the .withNumShards(1) method). This assumes the window has been defined in a previous step. A GroupByKey is not required prior to writing since it is done in the process of writing whenever the number of shards is defined explicitly. See the FileIO documentation for more details under "Writing files -> How many shards are generated per pane".

这篇关于在 Apache Beam 中使用 defaultNaming 进行动态窗口写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆