flink无水印的union数据流如何排序 [英] How to sort the union datastream of flink without watermark

查看:114
本文介绍了flink无水印的union数据流如何排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

flink 流有多个数据流,然后我用 org.apache.flink.streaming.api.datastream.DataStream#union 方法合并这些数据流.然后,我遇到了问题,数据流混乱,无法设置窗口对数据流中的数据进行排序.

The flink flow has multi data stream, then I merge those data stream with org.apache.flink.streaming.api.datastream.DataStream#union method. Then, I got the problem, the datastream is disordered and I can not set window to sort the data in data stream.

排序流联合以识别用户Apache Flink 中的会话

我得到了答案,但是 com.liam.learn.flink.example.union.UnionStreamDemo.SortFunction#onTimer从未被调用.

I got the the answer, but the com.liam.learn.flink.example.union.UnionStreamDemo.SortFunction#onTimer never been invoked.

环境信息:flink 1.7.0 版

Environment Info: flink version 1.7.0

一般来说,我希望对不带水印的联合数据流进行排序.

In general, I hope to sort the union datastream witout watermark.

推荐答案

您需要水印,以便排序函数知道何时可以安全地发出已排序的元素.如果没有水印,您会从流 B 中获得一条记录,该记录的日期早于流 A 的前 N ​​条记录中的任何一条记录,对吗?

You need watermarks so that the sorting function knows when it can safely emit sorted elements. Without watermarks, you get get an record from stream B that has an earlier date than any of the first N records of stream A, right?

但是 添加水印很容易,特别是如果您知道任何一个流的事件时间"都在严格增加.下面是我编写的一些代码,这些代码扩展了 David Anderson 在他对您上面提到的其他 SO 问题的回答中发布的内容 - 希望这可以帮助您入门.

But adding watermarks is easy, especially if you know that "event time" is strictly increasing for any one stream. Below is some code I wrote that extends what David Anderson posted in his answer to the other SO issue you referenced above - hopefully this will get you started.

-- 肯

package com.scaleunlimited.flinksnippets;

import java.util.PriorityQueue;
import java.util.Random;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class MergeAndSortStreamsTest {

    @Test
    public void testMergeAndSort() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Event> streamA = env.addSource(new EventSource("A"))
                .assignTimestampsAndWatermarks(new EventTSWAssigner());
        DataStream<Event> streamB = env.addSource(new EventSource("B"))
                .assignTimestampsAndWatermarks(new EventTSWAssigner());

        streamA.union(streamB)
        .keyBy(r -> r.getKey())
        .process(new SortByTimestampFunction())
        .print();

        env.execute();
    }

    private static class Event implements Comparable<Event> {
        private String _label;
        private long _timestamp;

        public Event(String label, long timestamp) {
            _label = label;
            _timestamp = timestamp;
        }

        public String getLabel() {
            return _label;
        }

        public void setLabel(String label) {
            _label = label;
        }

        public String getKey() {
            return "1";
        }

        public long getTimestamp() {
            return _timestamp;
        }

        public void setTimestamp(long timestamp) {
            _timestamp = timestamp;
        }

        @Override
        public String toString() {
            return String.format("%s @ %d", _label, _timestamp);
        }

        @Override
        public int compareTo(Event o) {
            return Long.compare(_timestamp, o._timestamp);
        }
    }

    @SuppressWarnings("serial")
    private static class EventTSWAssigner extends AscendingTimestampExtractor<Event> {

        @Override
        public long extractAscendingTimestamp(Event element) {
            return element.getTimestamp();
        }
    }

    @SuppressWarnings("serial")
    private static class SortByTimestampFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            long currentWatermark = timerService.currentWatermark();
            System.out.format("processElement called with watermark %d\n", currentWatermark);
            if (context.timestamp() > currentWatermark) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }

                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.getTimestamp());
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            long watermark = context.timerService().currentWatermark();
            System.out.format("onTimer called  with watermark %d\n", watermark);
            Event head = queue.peek();
            while (head != null && head.getTimestamp() <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

    @SuppressWarnings("serial")
    private static class EventSource extends RichParallelSourceFunction<Event> {

        private String _prefix;

        private transient Random _rand;
        private transient boolean _running;
        private transient int _numEvents;

        public EventSource(String prefix) {
            _prefix = prefix;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            _rand = new Random(_prefix.hashCode() + getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void cancel() {
            _running = false;
        }

        @Override
        public void run(SourceContext<Event> context) throws Exception {
            _running = true;
            _numEvents = 0;
            long timestamp = System.currentTimeMillis() + _rand.nextInt(10);

            while (_running && (_numEvents < 100)) {
                long deltaTime = timestamp - System.currentTimeMillis();
                if (deltaTime > 0) {
                    Thread.sleep(deltaTime);
                }

                context.collect(new Event(_prefix, timestamp));
                _numEvents++;

                // Generate a timestamp every 5...15 ms, average is 10.
                timestamp += (5 + _rand.nextInt(10));
            }
        }

    }
}

这篇关于flink无水印的union数据流如何排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆