Flink:如何存储状态并在另一个流中使用? [英] Flink: how to store state and use in another stream?
问题描述
我有一个Flink用例,我需要从文件中读取信息,存储每一行,然后使用此状态过滤另一个流.
I have a use-case for Flink where I need to read information from a file, store each line, and then use this state to filter another stream.
我现在可以使用connect
运算符和RichCoFlatMapFunction
来完成所有这些工作,但是感觉太复杂了.另外,我担心flatMap2
可能在从文件加载所有状态之前开始执行:
I have all of this working right now with the connect
operator and a RichCoFlatMapFunction
, but it feels overly complicated. Also, I'm concerned that flatMap2
could begin executing before all of the state is loaded from the file:
fileStream
.connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
.keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
.flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
private transient ValueState<String> storedPartId;
@Override
public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
// store state
storedPartId.update(partId);
}
@Override
public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
if (record.getPartId().equals(storedPartId.value())) {
out.collect(record);
} else {
// do nothing
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>(
"partId", // the state name
TypeInformation.of(new TypeHint<String>() {}),
null);
storedPartId = getRuntimeContext().getState(descriptor);
}
});
(从Flink 1.1.3开始)是否有更好的方法来完成这种加载状态模式,然后在后续流中使用它?
Is there a better way (as of Flink 1.1.3) to accomplish this pattern of loading state, then using it in subsequent streams?
推荐答案
您对CoFlatMapFunction
的担心是正确的.不能控制flatMap1
和flatMap2
的调用顺序,并且取决于数据到达的顺序.因此,flatMap2
可能会在flatMap1
读取所有数据之前被调用.
Your concerns about the CoFlatMapFunction
are correct. The order in which flatMap1
and flatMap2
are called cannot be controlled and depend on the order in which data arrives. So, flatMap2
might be called before all data has been read by flatMap1
.
Flink 1.1.3中开始处理流之前读取所有数据的唯一方法是使用RichFlatMapFunction
的open()
方法消耗数据,即,您必须手动读取和解析文件.
The only way in Flink 1.1.3 to read all data before starting to process a stream is to consume the data in the open()
method of a RichFlatMapFunction
, i.e., you have to manually read and parse the file.
这基本上是一种广播加入策略,即,运营商的每个并行实例都将执行此操作.缺点是将复制文件的数据.好处是您不必混洗主流"流(无需使用keyBy()
).
This is basically a broadcast join strategy, i.e., each parallel instance of the operator will do this. The drawback is that the data of the file will be replicated. The benefit is that you do not have to shuffle the "main" stream (no need to use keyBy()
).
这篇关于Flink:如何存储状态并在另一个流中使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!