apache flink 0.10 如何从无界输入数据流中第一次出现复合键? [英] apache flink 0.10 how to get the first occurence of a composite key from an unbounded input dataStream?
问题描述
我是 apache flink 的新手.我的输入中有一个未绑定的数据流(通过 kakfa 输入 flink 0.10).
I am a newbie with apache flink. I have an unbound data stream in my input (fed into flink 0.10 via kakfa).
我想获取每个主键的第一次出现(主键是 contract_num 和 event_dt).
这些重复"几乎立即发生在彼此之后.源系统无法为我过滤这个,所以flink必须做.
I want to get the 1st occurence of each primary key (the primary key is the contract_num and the event_dt).
These "duplicates" occur nearly immediately after each other.
The source system cannot filter this for me, so flink has to do it.
这是我的输入数据:
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
这是我想要的输出数据:
Here is the output data I want:
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
请注意,由于 A001 和 '2016-02-24 10:25:08' 的组合键已在第一行中出现,因此第 2 行已被删除.
Note the 2nd row has been removed as the key combination of A001 and '2016-02-24 10:25:08' already occurred in the 1st row.
我如何使用 flink 0.10 做到这一点?
How can I do this with flink 0.10?
我正在考虑使用 keyBy(0,1)
但之后我不知道该怎么做!
I was thinking about using keyBy(0,1)
but after that I don't know what to do!
(我使用 joda-time 和 org.flinkspector 来设置这些测试).
(I used joda-time and org.flinkspector to setup these tests).
@Test
public void test() {
DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);
DataStream<Tuple3<String, Date, String>> testStream =
createTimedTestStreamWith(
Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
.emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
.close();
testStream.keyBy(0,1);
}
推荐答案
如果您的密钥空间大于可用存储空间,则通过无限流过滤重复项最终将失败.原因是您必须将已经看到的键存储在某处以过滤掉重复项.因此,最好定义一个时间窗口,之后您可以清除当前看到的密钥集.
Filtering duplicates over an infinite stream will eventually fail if your key space is larger than your available storage space. The reason is that you have to store the already seen keys somewhere to filter out the duplicates. Thus, it would be good to define a time window after which you can purge the current set of seen keys.
如果您知道这个问题但无论如何都想尝试一下,您可以通过在 keyBy
调用之后应用有状态的 flatMap
操作来实现.有状态映射器使用 Flink 的状态抽象来存储它是否已经看到具有此键的元素.这样,你也将受益于 Flink 的容错机制,因为你的状态将被自动检查点.
If you're aware of this problem but want to try it anyway, you can do it by applying a stateful flatMap
operation after the keyBy
call. The stateful mapper uses Flink's state abstraction to store whether it has already seen an element with this key or not. That way, you will also benefit from Flink's fault tolerance mechanism because your state will be automatically checkpointed.
一个完成你工作的 Flink 程序可能看起来像
A Flink program doing your job could look like
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));
input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();
env.execute("Test");
}
其中DuplicateFilter
的实现取决于Flink的版本.
where the implementation of DuplicateFilter
depends on the version of Flink.
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
private ValueState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
// we haven't seen the element yet
out.collect(value);
// set operator state to true so that we don't emit elements with this key again
operatorState.update(true);
}
}
}
版本 0.10 实现
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
private OperatorState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
// we haven't seen the element yet
out.collect(value);
operatorState.update(true);
}
}
}
更新:使用翻滚时间窗口
input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
out.collect(input.iterator().next());
}
})
这篇关于apache flink 0.10 如何从无界输入数据流中第一次出现复合键?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!