如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径? [英] How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?

查看:46
本文介绍了如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS).现在我想改变管道,以便它读取多个主题并将它们写成 gs://bucket/topic/...

I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...

当只阅读一个主题时,我在管道的最后一步使用了 TextIO:

When reading only a single topic I used TextIO in the last step of my pipeline:

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

This 是一个类似的问题,我试图改编哪个代码.

This is a similar question, which code I tried to adapt.

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

official JavaDoc 包含似乎具有过时方法签名的示例代码.(.via 方法似乎改变了参数的顺序).此外,我还偶然发现了 FileIO 中的示例,这让我感到困惑 - 不应该 TransactionTypeTransaction 在这一行 换地方?

The official JavaDoc contains example code which seem to have outdated method signatures. (The .via method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO which confused me - shouldn't TransactionType and Transaction in this line change places?

推荐答案

经过一夜的睡眠和重新开始后,我找到了解决方案,我使用了函数式 Java 8 风格,因为它使代码更短(且更具可读性):

After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

说明:

  • Event 是一个 Java POJO,包含了 Kafka 消息的负载和它所属的主题,在 KafkaIO 之后的 ParDo 中解析代码>步骤
  • suffix 要么是 dev 要么是空的,由环境变量设置
  • currentMillisString 包含整个管道时的时间戳启动时新文件不会覆盖 GCS 上的旧文件管道重新启动
  • FileNaming 实现自定义命名并在其构造函数中接收事件(主题)的类型,它使用自定义格式化程序写入 GCS 上每日分区的子文件夹":

  • Event is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in a ParDo after the KafkaIO step
  • suffix is a either dev or empty and set by environment variables
  • currentMillisStringcontains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restarted
  • FileNaming implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS:

class FileNaming implements FileIO.Write.FileNaming {
  static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
    return new FileNaming(topic, suffix, currentMillisString);
  }

  private static final DateTimeFormatter FORMATTER = DateTimeFormat
      .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));

  private final String topic;
  private final String suffix;
  private final String currentMillisString;

  private String filenamePrefixForWindow(IntervalWindow window) {
    return String.format(
        "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
  }

  private FileNaming(String topic, String suffix, String currentMillisString) {
    this.topic = topic;
    this.suffix = suffix;
    this.currentMillisString = currentMillisString;
  }

  @Override
  public String getFilename(
      BoundedWindow window,
      PaneInfo pane,
      int numShards,
      int shardIndex,
      Compression compression) {

    IntervalWindow intervalWindow = (IntervalWindow) window;
    String filenamePrefix = filenamePrefixForWindow(intervalWindow);
    String filename =
        String.format(
            "pane-%d-%s-%05d-of-%05d%s",
            pane.getIndex(),
            pane.getTiming().toString().toLowerCase(),
            shardIndex,
            numShards,
            suffix);
    String fullName = filenamePrefix + filename;
    return fullName;
  }
}

这篇关于如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆