KeyedProcessFunction 实现抛出空指针异常? [英] KeyedProcessFunction implementation throwing null pointer exception?

查看:52
本文介绍了KeyedProcessFunction 实现抛出空指针异常?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

第一个示例:来自https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html"

我正在尝试覆盖 KeyedProcessFunction 类的 processElement().ProcessElement 有 3 个参数,其中一个参数是上下文对象.当我尝试从上下文对象中检索时间戳时,它抛出空指针异常.

I am trying to override processElement() of KeyedProcessFunction class. ProcessElement has 3 parameters and one of a parameter is context object. When I tried to retrieve timestamp from context object, it throws null pointer exception.

在第一个示例代码中抛出空指针异常的一行是

A line which throws null pointer exception in First example code is

current.lastModified = ctx.timestamp();

current.lastModified = ctx.timestamp();

第二个例子:Stream Processing with Apache Flink"一书的例子6.5.

Second Example: Example 6.5 of "Stream Processing with Apache Flink" book.

我在扩展 KeyedProcessFunction 类的类中声明了两个 ValueState 变量.当我尝试检索状态中更新的最后一个值时,它返回一个空值.

I have two ValueState variables declared in a class which extends KeyedProcessFunction class. When I tried to retrieve the last value updated in the state, it returns a null value.

在第一个示例代码中抛出空指针异常的一行是

A line which throws null pointer exception in First example code is

Double prevTemp = lastTemp.value();if(prevTemp==0.0 || r.temperature

Double prevTemp = lastTemp.value(); if(prevTemp==0.0 || r.temperature < prevTemp) {}

First Example code
public class KeyedProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple2<String, String>> stream =
                environment.socketTextStream("localhost",9090)
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String s) throws Exception {
                                String[] words= s.split(",");

                                return new Tuple2<>(words[0],words[1]);
                            }
                        });

        DataStream<Tuple2<String, Long>> result = stream
                .keyBy(0)
                .process(new CountWithTimeoutFunction());

        result.print();

        environment.execute("Keyed Process Function Example");

    }
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // update the state's count
            current.count++;

            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();

            // write the state back
            state.update(current);

            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();

            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
}

class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

第二个例子

public class KeyedProcessFunctionTimerExample {
    public static void main(String[] args) throws Exception{
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // use event time for the application
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStream<String> sensorData=
                env.addSource(new SensorSource())
                .keyBy(r -> r.id)
                .process(new TempIncreaseAlertFunction());

        sensorData.print();
        env.execute("Keyed Process Function execution");
    }

    public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {

        private ValueState<Double> lastTemp;
        private ValueState<Long> currentTimer;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
            currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
        }

        @Override
        public void processElement(
                SensorReading r,
                Context ctx,
                Collector<String> out) throws Exception {

            // get previous Temp
            Double prevTemp = lastTemp.value();

            // update last temp
            lastTemp.update(r.temperature);

            Long curTimerTimestamp = currentTimer.value();

            if(prevTemp==0.0 || r.temperature < prevTemp) {
                ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            }
            else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
                Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
                ctx.timerService().registerProcessingTimeTimer(timerTs);
                currentTimer.update(timerTs);

            }
        }

        @Override
        public void onTimer(
                long ts,
                OnTimerContext ctx,
                Collector<String> out) throws Exception {

            out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
            currentTimer.clear();

        }
    }

}

它不应该抛出空指针异常.您的帮助将不胜感激.谢谢!

It should not throw null pointer exception. Your help will be appreciated. Thanks!

推荐答案

在 Flink 中处理事件时间时,您必须安排事件有时间戳,并且流有水印.您可以通过实现时间戳提取器和水印生成器来实现,如 此处此处.

When working with event time in Flink you must arrange for the events to have timestamps, and for the streams to have watermarks. You do this by implementing a timestamp extractor and watermark generator, as described here and here.

另请参阅教程.

这篇关于KeyedProcessFunction 实现抛出空指针异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆