如何在其他的基础上过滤Apache flink流? [英] How to filter Apache flink stream on the basis of other?

查看:19
本文介绍了如何在其他的基础上过滤Apache flink流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个流,一个是 Int ,另一个是 json .在 json Schema 中有一个键是一些 int .所以我需要通过与另一个整数流的键比较来过滤 json 流,所以这可能吗在 Flink 中?

I have two stream one is of Int and other is of json .In The json Schema there is one key which is some int .So i need to filter the json stream via key comparison with the other integer stream so Is it possible in Flink?

推荐答案

是的,你可以用 Flink 做这种流处理.您需要从 Flink 获得的基本构建块是连接流和有状态函数——这是一个使用 RichCoFlatMap 的示例:

Yes, you can do this kind of stream processing with Flink. The basic building blocks you need from Flink are connected streams, and stateful functions -- here's an example using a RichCoFlatMap:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}

在这个例子中,只有那些在控制流上看到的键才会通过数据流——所有其他的事件都被过滤掉了.我利用了 Flink 的托管键控state连接的流.

In this example, only those keys that have been seen on the control stream are passed through the data stream -- all other events are filtered out. I've taken advantage of Flink's managed keyed state and connected streams.

为了简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到有关如何使用 JSON 和 Flink 的示例.

To keep this simple I've ignored your requirement that the data stream has JSON, but you can find examples of how to work with JSON and Flink elsewhere.

请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间.您可以通过向流添加事件时间时间戳来管理此问题,然后改用 RichCoProcessFunction.

Note that your results will be non-deterministic, since you have no control over the timing of the two streams relative to one another. You could manage this by adding event-time timestamps to the streams, and then using a RichCoProcessFunction instead.

这篇关于如何在其他的基础上过滤Apache flink流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆