apache flink 0.10 如何从无界输入数据流中第一次出现复合键? [英] apache flink 0.10 how to get the first occurence of a composite key from an unbounded input dataStream?

查看:24
本文介绍了apache flink 0.10 如何从无界输入数据流中第一次出现复合键?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 apache flink 的新手.我的输入中有一个未绑定的数据流(通过 kakfa 输入 flink 0.10).

I am a newbie with apache flink. I have an unbound data stream in my input (fed into flink 0.10 via kakfa).

我想获取每个主键的第一次出现(主键是 contract_num 和 event_dt).
这些重复"几乎立即发生在彼此之后.源系统无法为我过滤这个,所以flink必须做.

I want to get the 1st occurence of each primary key (the primary key is the contract_num and the event_dt).
These "duplicates" occur nearly immediately after each other. The source system cannot filter this for me, so flink has to do it.

这是我的输入数据:

contract_num, event_dt, attr 
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

这是我想要的输出数据:

Here is the output data I want:

A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

请注意,由于 A001 和 '2016-02-24 10:25:08' 的组合键已在第一行中出现,因此第 2 行已被删除.

Note the 2nd row has been removed as the key combination of A001 and '2016-02-24 10:25:08' already occurred in the 1st row.

我如何使用 flink 0.10 做到这一点?

How can I do this with flink 0.10?

我正在考虑使用 keyBy(0,1) 但之后我不知道该怎么做!

I was thinking about using keyBy(0,1) but after that I don't know what to do!

(我使用 joda-time 和 org.flinkspector 来设置这些测试).

(I used joda-time and org.flinkspector to setup these tests).

@Test
public void test() {
    DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
    DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
    DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);

    DataStream<Tuple3<String, Date, String>> testStream =
            createTimedTestStreamWith(
                    Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
            .emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
            .emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
            .emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
            .close();
    
    testStream.keyBy(0,1);
}

推荐答案

如果您的密钥空间大于可用存储空间,则通过无限流过滤重复项最终将失败.原因是您必须将已经看到的键存储在某处以过滤掉重复项.因此,最好定义一个时间窗口,之后您可以清除当前看到的密钥集.

Filtering duplicates over an infinite stream will eventually fail if your key space is larger than your available storage space. The reason is that you have to store the already seen keys somewhere to filter out the duplicates. Thus, it would be good to define a time window after which you can purge the current set of seen keys.

如果您知道这个问题但无论如何都想尝试一下,您可以通过在 keyBy 调用之后应用有状态的 flatMap 操作来实现.有状态映射器使用 Flink 的状态抽象来存储它是否已经看到具有此键的元素.这样,你也将受益于 Flink 的容错机制,因为你的状态将被自动检查点.

If you're aware of this problem but want to try it anyway, you can do it by applying a stateful flatMap operation after the keyBy call. The stateful mapper uses Flink's state abstraction to store whether it has already seen an element with this key or not. That way, you will also benefit from Flink's fault tolerance mechanism because your state will be automatically checkpointed.

一个完成你工作的 Flink 程序可能看起来像

A Flink program doing your job could look like

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));

    input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();

    env.execute("Test");
}

其中DuplicateFilter的实现取决于Flink的版本.

where the implementation of DuplicateFilter depends on the version of Flink.

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
    private ValueState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            // set operator state to true so that we don't emit elements with this key again
            operatorState.update(true);
        }
    }
}

版本 0.10 实现

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    private OperatorState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            operatorState.update(true);
        }
    }
}

更新:使用翻滚时间窗口

input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
        out.collect(input.iterator().next());
    }
})

这篇关于apache flink 0.10 如何从无界输入数据流中第一次出现复合键?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆