如何在Apache Beam 2.6中使用FileIO.writeDynamic()写入多个输出路径? [英] How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?
问题描述
我正在使用Apache Beam 2.6从单个Kafka主题读取并将输出写入Google Cloud Storage(GCS)。现在,我想更改管道,以使其读取多个主题并将其写为 gs:// bucket / topic / ...
I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...
仅读取一个主题时,在管道的最后一步中使用了 TextIO
:
When reading only a single topic I used TextIO
in the last step of my pipeline:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
此是一个类似的问题,我尝试适应哪种代码。
This is a similar question, which code I tried to adapt.
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
官方JavaDoc 包含示例代码,这些代码似乎具有过时的方法签名。 ( .via
方法似乎改变了参数的顺序)。此外,我在 FileIO
中偶然发现了一个例子,这使我感到困惑-不应 TransactionType
和交易
在此行更改位置?
The official JavaDoc contains example code which seem to have outdated method signatures. (The .via
method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO
which confused me - shouldn't TransactionType
and Transaction
in this line change places?
推荐答案
经过一夜的睡眠和崭新的起点,我找到了解决方案,我使用了功能性的Java 8样式代码较短(且更易读):
After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):
.apply(
FileIO.<String, Event>writeDynamic()
.by((SerializableFunction<Event, String>) input -> input.getTopic())
.via(
Contextful.fn(
(SerializableFunction<Event, String>) input -> input.getPayload()),
TextIO.sink())
.to(String.format("gs://bucket/data%s/", suffix)
.withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
.withDestinationCoder(StringUtf8Coder.of())
.withTempDirectory(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
.withNumShards(1));
说明:
-
Event
是一个Java POJO,包含Kafka消息的有效负载及其所属的主题,它在ParDo
之后进行解析。KafkaIO
步骤 -
后缀
是dev
或为空并由环境变量设置 -
currentMillisString
包含整个管道
的时间戳已启动,以便在
a管道重新启动时,新文件不会覆盖GCS上的旧文件 -
FileNaming
实现自定义命名并在其构造函数中接收事件的类型(主题),它使用自定义格式化程序在GCS上写入每日分区的子文件夹:
Event
is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in aParDo
after theKafkaIO
stepsuffix
is a eitherdev
or empty and set by environment variablescurrentMillisString
contains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restartedFileNaming
implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS:
class FileNaming implements FileIO.Write.FileNaming {
static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
return new FileNaming(topic, suffix, currentMillisString);
}
private static final DateTimeFormatter FORMATTER = DateTimeFormat
.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));
private final String topic;
private final String suffix;
private final String currentMillisString;
private String filenamePrefixForWindow(IntervalWindow window) {
return String.format(
"%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
}
private FileNaming(String topic, String suffix, String currentMillisString) {
this.topic = topic;
this.suffix = suffix;
this.currentMillisString = currentMillisString;
}
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix = filenamePrefixForWindow(intervalWindow);
String filename =
String.format(
"pane-%d-%s-%05d-of-%05d%s",
pane.getIndex(),
pane.getTiming().toString().toLowerCase(),
shardIndex,
numShards,
suffix);
String fullName = filenamePrefix + filename;
return fullName;
}
}
这篇关于如何在Apache Beam 2.6中使用FileIO.writeDynamic()写入多个输出路径?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!