“广播状态"是什么意思?取消Flink的CEP库的“动态模式"功能的实现? [英] What does it mean that "broadcast state" unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library?

查看:195
本文介绍了“广播状态"是什么意思?取消Flink的CEP库的“动态模式"功能的实现?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从Flink 1.5发行公告中,我们知道Flink现在支持广播状态",并且描述为广播状态解除了Flink CEP库动态模式"功能的实施."

From the Flink 1.5 release announcement, we know Flink now supports "broadcast state", and it was described that "broadcast state unblocks the implementation of the "dynamic patterns" feature for Flink’s CEP library.".

这是否意味着当前我们可以在没有Flink CEP的情况下使用广播状态"来实现动态模式"? 我也不知道在有或没有广播状态的情况下为Flink CEP实现动态模式"有什么区别?如果有人可以提供示例代码来解释差异,我将不胜感激.

Does it means currently we can use "broadcast state" to implement the "dynamic patterns" without Flink CEP ? Also I have no idea what's the difference when implementing the "dynamic patterns" for Flink CEP with or without broadcast state? I would appreciate If someone can give an example with code to explain the difference.

=============

=============

通过操作员的带有关键字数据流的broadcast()更新测试广播数据流

在Flink 1.4.2中进行测试之后,我发现广播数据流(由旧操作员broadcast()可以与键控数据流连接,下面是测试代码,并且我们发现了广播到所有操作员实例的所有控制流事件. 因此,看来旧的broadcast()可以实现与新的广播状态"相同的功能.

After testing in Flink 1.4.2, I found the broadcast datastream(by old operater broadcast()) can connect with keyed datastream, below is the test code, and we found all of the control stream events broadcast to all operator instance. So it seems the old broadcast() can achieve the same functionality as new "broadcast state" .

public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}

下面是测试结果.

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement

推荐答案

没有广播状态,两个Flink数据流无法以有状态方式一起处理,除非以完全相同的方式对其进行键控.广播流可以连接到键控流,但是例如,如果您随后尝试在RichCoFlatMap中使用键控状态,则该操作将失败.

Without broadcast state, two Flink data streams can not be processed together in a stateful way unless they are keyed in exactly the same way. A broadcast stream can be connected to a keyed stream, but if you then try to use keyed state in a RichCoFlatMap, for example, that will fail.

经常需要的是能够具有一个带有动态规则"的流,该规则将应用于另一流上的每个事件,而与键无关.需要有一种新的托管Flink状态,可以在其中存储这些规则.使用广播状态,现在可以以一种简单的方式完成.

What is frequently desired is to be able to have one stream with dynamic "rules" that are to be applied to every event on another stream, regardless of key. There needed to be a new kind of managed Flink state in which these rules could be stored. With broadcast state this can now be done in a straightforward way.

现在有了此功能,就可以开始支持CEP中的动态模式.

With this feature now in place, work on support for dynamic patterns in CEP can begin.

这篇关于“广播状态"是什么意思?取消Flink的CEP库的“动态模式"功能的实现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆