Flink EventTime 处理水印总是出现 -9223372036854725808 [英] Flink EventTime Processing Watermark is always coming as -9223372036854725808

查看:114
本文介绍了Flink EventTime 处理水印总是出现 -9223372036854725808的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 process 函数对一组事件进行一些处理.我正在使用事件时间和键控流.我面临的问题是水印值总是以 9223372036854725808 的形式出现.我已将打印语句进行调试,它显示如下:

I am trying to use process function to some processing on a set of events. I am using event time and keyed stream. The issue i am facing is Watermark value is always coming as 9223372036854725808. i have put print statement to debug and it shows like this:

时间戳------1583128014000提取时间戳 1583128014000当前水印-----9223372036854775808

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

时间戳------1583128048000提取时间戳 1583128048000当前水印-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

时间戳------1583128089000提取时间戳 1583128089000当前水印-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

所以时间戳和提取的时间戳改变了,但水印没有更新.所以没有记录进入队列,因为上下文.时间戳永远不会小于水印.

So timestamp and extractedTimestamp changing but watermark not getting updated.So no record is getting in queue as context.timestamp is never less than watermark.

DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }

这是processFunction的代码....

This is the code for processFunction ....

public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {

    private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;

    @Override
    public void open(Configuration config) throws Exception {
        System.out.println("open");
        ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
                "sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
        })
        );
        queueState = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
        Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();

        PriorityQueue<GenericRecord> records = tuple.f1;

    }

    @Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}

推荐答案

以下是您分享的内容的可能解释:

Here's a possible explanation for what you've shared:

TimestampsAndPunctuatedWatermarksOperator 在为给定记录调用 checkAndGetNextWatermark 之前先调用 extractTimestamp.这意味着第一次在每个任务(并行实例)中调用 MatchFunction 中的 processElement 时,当前水印将为 Long.MIN_VALUE(即 -9223372036854775808).

The TimestampsAndPunctuatedWatermarksOperator calls extractTimestamp before it calls checkAndGetNextWatermark for a given record. This means that the first time the processElement in your MatchFunction is called in each task (parallel instance), the current watermark will be Long.MIN_VALUE (which is -9223372036854775808).

如果你的并行度足够大,那可以解释看到

If your parallelism is large enough, that could explain seeing

currentwatermark-----9223372036854775808

几次.

这篇关于Flink EventTime 处理水印总是出现 -9223372036854725808的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆