如何在Apache Beam 2.6中使用FileIO.writeDynamic()写入多个输出路径? [英] How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?

查看:256
本文介绍了如何在Apache Beam 2.6中使用FileIO.writeDynamic()写入多个输出路径?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Apache Beam 2.6从单个Kafka主题读取并将输出写入Google Cloud Storage(GCS)。现在,我想更改管道,以使其读取多个主题并将其写为 gs:// bucket / topic / ...

I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...

仅读取一个主题时,在管道的最后一步中使用了 TextIO

When reading only a single topic I used TextIO in the last step of my pipeline:

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

是一个类似的问题,我尝试适应哪种代码。

This is a similar question, which code I tried to adapt.

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

官方JavaDoc 包含示例代码,这些代码似乎具有过时的方法签名。 ( .via 方法似乎改变了参数的顺序)。此外,我在 FileIO 中偶然发现了一个例子,这使我感到困惑-不应 TransactionType 交易 在此行更改位置?

The official JavaDoc contains example code which seem to have outdated method signatures. (The .via method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO which confused me - shouldn't TransactionType and Transaction in this line change places?

推荐答案

经过一夜的睡眠和崭新的起点,我找到了解决方案,我使用了功能性的Java 8样式代码较短(且更易读):

After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

说明:


  • Event 是一个Java POJO,包含Kafka消息的有效负载及其所属的主题,它在 ParDo 之后进行解析。 KafkaIO 步骤

  • 后缀 dev 或为空并由环境变量设置

  • currentMillisString 包含整个管道
    的时间戳已启动,以便在
    a管道重新启动时,新文件不会覆盖GCS上的旧文件

  • FileNaming 实现自定义命名并在其构造函数中接收事件的类型(主题),它使用自定义格式化程序在GCS上写入每日分区的子文件夹:

  • Event is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in a ParDo after the KafkaIO step
  • suffix is a either dev or empty and set by environment variables
  • currentMillisStringcontains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restarted
  • FileNaming implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS:

class FileNaming implements FileIO.Write.FileNaming {
  static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
    return new FileNaming(topic, suffix, currentMillisString);
  }

  private static final DateTimeFormatter FORMATTER = DateTimeFormat
      .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));

  private final String topic;
  private final String suffix;
  private final String currentMillisString;

  private String filenamePrefixForWindow(IntervalWindow window) {
    return String.format(
        "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
  }

  private FileNaming(String topic, String suffix, String currentMillisString) {
    this.topic = topic;
    this.suffix = suffix;
    this.currentMillisString = currentMillisString;
  }

  @Override
  public String getFilename(
      BoundedWindow window,
      PaneInfo pane,
      int numShards,
      int shardIndex,
      Compression compression) {

    IntervalWindow intervalWindow = (IntervalWindow) window;
    String filenamePrefix = filenamePrefixForWindow(intervalWindow);
    String filename =
        String.format(
            "pane-%d-%s-%05d-of-%05d%s",
            pane.getIndex(),
            pane.getTiming().toString().toLowerCase(),
            shardIndex,
            numShards,
            suffix);
    String fullName = filenamePrefix + filename;
    return fullName;
  }
}


这篇关于如何在Apache Beam 2.6中使用FileIO.writeDynamic()写入多个输出路径?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆