Flink Streaming:如何根据数据将一个数据流输出到不同的输出? [英] Flink Streaming: How to output one data stream to different outputs depending on the data?
问题描述
在Apache Flink中,我有一个元组流。让我们假设一个非常简单的 Tuple1< String>
。元组可以在其值字段中具有任意值(例如,P1,P2等)。可能值的集合是有限的,但我事先并不知道全集(因此可能存在'P362')。我想根据元组内部的值将该元组写入某个输出位置。所以例如我想拥有以下文件结构:
In Apache Flink I have a stream of tuples. Let's assume a really simple Tuple1<String>
. The tuple can have an arbitrary value in it's value field (e.g. 'P1', 'P2', etc.). The set of possible values is finite but I don't know the full set beforehand (so there could be a 'P362'). I want to write that tuple to a certain output location depending on the value inside of the tuple. So e.g. I would like to have the following file structure:
-
/ output / P1
-
/ output / P2
/output/P1
/output/P2
在文档中我只发现了写入我事先知道的位置的可能性(例如 stream.writeCsv(/ output / somewhere)
),但没有办法让内容数据确定数据实际结束的位置。
In the documentation I only found possibilities to write to locations that I know beforehand (e.g. stream.writeCsv("/output/somewhere")
), but no way of letting the contents of the data decide where the data is actually ending up.
我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到不同的目的地我想要它的方式(或者我只是不明白这将如何工作)。
I read about output splitting in the documentation but this doesn't seem to provide a way to redirect the output to different destinations the way I would like to have it (or I just don't understand how this would work).
这可以通过Flink API完成,如果是这样的话, 怎么样?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己制作这样的东西?
Can this be done with the Flink API, if so, how? If not, is there maybe a third party library that can do it or would I have to build such a thing on my own?
更新
根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件。我把它放在这里作为参考,也许它对其他人有用:
Following Matthias' suggestion I came up with a sifting sink function which determines the output path and then writes the tuple to the respective file after serializing it. I put it here for reference, maybe it is useful for someone else:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {
private final OutputSelector<IT> outputSelector;
private final MapFunction<IT, String> serializationFunction;
private final String basePath;
Map<String, TextOutputFormat<String>> formats = new HashMap<>();
/**
* @param outputSelector the selector which determines into which output(s) a record is written.
* @param serializationFunction a function which serializes the record to a string.
* @param basePath the base path for writing the records. It will be appended with the output selector.
*/
public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) {
this.outputSelector = outputSelector;
this.serializationFunction = serializationFunction;
this.basePath = basePath;
}
@Override
public void invoke(IT value) throws Exception {
// find out where to write.
Iterable<String> selection = outputSelector.select(value);
for (String s : selection) {
// ensure we have a format for this.
TextOutputFormat<String> destination = ensureDestinationExists(s);
// then serialize and write.
destination.writeRecord(serializationFunction.map(value));
}
}
private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException {
// if we know the destination, we just return the format.
if (formats.containsKey(selection)) {
return formats.get(selection);
}
// create a new output format and initialize it from the context.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(context.getTaskStubParameters());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// put it into our map.
formats.put(selection, format);
return format;
}
@Override
public void close() throws IOException {
Exception lastException = null;
try {
for (TextOutputFormat<String> format : formats.values()) {
try {
format.close();
} catch (Exception e) {
lastException = e;
format.tryCleanupOnError();
}
}
} finally {
formats.clear();
}
if (lastException != null) {
throw new IOException("Close failed.", lastException);
}
}
}
推荐答案
您可以实现自定义接收器。继承以下两者之一:
You can implement a custom sink. Inherit from one of both:
-
org.apache.flink.streaming.api.functions.sink.SinkFunction
-
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
org.apache.flink.streaming.api.functions.sink.SinkFunction
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
在你的程序中使用:
stream.addSink(SinkFunction<T> sinkFunction);
而不是 stream.writeCsv(/ output / somewhere)
。
这篇关于Flink Streaming:如何根据数据将一个数据流输出到不同的输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!