如何在其他的基础上过滤Apache flink流? [英] How to filter Apache flink stream on the basis of other?

查看:205
本文介绍了如何在其他的基础上过滤Apache flink流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个流,一个是Int的,另一个是json的.在json模式中,有一个键是某个int.所以我需要通过与其他整数流进行键比较来过滤json流,这是否可能在Flink中?

I have two stream one is of Int and other is of json .In The json Schema there is one key which is some int .So i need to filter the json stream via key comparison with the other integer stream so Is it possible in Flink?

推荐答案

是的,您可以使用Flink进行这种流处理. Flink需要的基本构建块是连接的流和有状态的函数-这是一个使用RichCoFlatMap的示例:

Yes, you can do this kind of stream processing with Flink. The basic building blocks you need from Flink are connected streams, and stateful functions -- here's an example using a RichCoFlatMap:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}

在此示例中,只有在控制流中看到的那些键才通过数据流传递-过滤掉所有其他事件.我利用了 Flink的托管键状态

In this example, only those keys that have been seen on the control stream are passed through the data stream -- all other events are filtered out. I've taken advantage of Flink's managed keyed state and connected streams.

为简单起见,我已经忽略了您对数据流必须具有JSON的要求,但是您可以在其他地方找到有关如何使用JSON和Flink的示例.

To keep this simple I've ignored your requirement that the data stream has JSON, but you can find examples of how to work with JSON and Flink elsewhere.

请注意,由于您无法控制两个流相对于彼此的时间,因此您的结果将是不确定的.您可以通过将事件时间时间戳记添加到流中,然后使用RichCoProcessFunction来进行管理.

Note that your results will be non-deterministic, since you have no control over the timing of the two streams relative to one another. You could manage this by adding event-time timestamps to the streams, and then using a RichCoProcessFunction instead.

这篇关于如何在其他的基础上过滤Apache flink流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆