KeyedProcessFunction 实现抛出空指针异常? [英] KeyedProcessFunction implementation throwing null pointer exception?
问题描述
第一个示例:来自https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html"
我正在尝试覆盖 KeyedProcessFunction 类的 processElement().ProcessElement 有 3 个参数,其中一个参数是上下文对象.当我尝试从上下文对象中检索时间戳时,它抛出空指针异常.
I am trying to override processElement() of KeyedProcessFunction class. ProcessElement has 3 parameters and one of a parameter is context object. When I tried to retrieve timestamp from context object, it throws null pointer exception.
在第一个示例代码中抛出空指针异常的一行是
A line which throws null pointer exception in First example code is
current.lastModified = ctx.timestamp();
current.lastModified = ctx.timestamp();
第二个例子:Stream Processing with Apache Flink"一书的例子6.5.
Second Example: Example 6.5 of "Stream Processing with Apache Flink" book.
我在扩展 KeyedProcessFunction 类的类中声明了两个 ValueState 变量.当我尝试检索状态中更新的最后一个值时,它返回一个空值.
I have two ValueState variables declared in a class which extends KeyedProcessFunction class. When I tried to retrieve the last value updated in the state, it returns a null value.
在第一个示例代码中抛出空指针异常的一行是
A line which throws null pointer exception in First example code is
Double prevTemp = lastTemp.value();if(prevTemp==0.0 || r.temperature
Double prevTemp = lastTemp.value(); if(prevTemp==0.0 || r.temperature < prevTemp) {}
First Example code
public class KeyedProcessFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, String>> stream =
environment.socketTextStream("localhost",9090)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] words= s.split(",");
return new Tuple2<>(words[0],words[1]);
}
});
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
result.print();
environment.execute("Keyed Process Function Example");
}
public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
}
class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
第二个例子
public class KeyedProcessFunctionTimerExample {
public static void main(String[] args) throws Exception{
// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> sensorData=
env.addSource(new SensorSource())
.keyBy(r -> r.id)
.process(new TempIncreaseAlertFunction());
sensorData.print();
env.execute("Keyed Process Function execution");
}
public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {
private ValueState<Double> lastTemp;
private ValueState<Long> currentTimer;
@Override
public void open(Configuration parameters) throws Exception {
lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
}
@Override
public void processElement(
SensorReading r,
Context ctx,
Collector<String> out) throws Exception {
// get previous Temp
Double prevTemp = lastTemp.value();
// update last temp
lastTemp.update(r.temperature);
Long curTimerTimestamp = currentTimer.value();
if(prevTemp==0.0 || r.temperature < prevTemp) {
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
currentTimer.clear();
}
else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
ctx.timerService().registerProcessingTimeTimer(timerTs);
currentTimer.update(timerTs);
}
}
@Override
public void onTimer(
long ts,
OnTimerContext ctx,
Collector<String> out) throws Exception {
out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
currentTimer.clear();
}
}
}
它不应该抛出空指针异常.您的帮助将不胜感激.谢谢!
It should not throw null pointer exception. Your help will be appreciated. Thanks!
推荐答案
在 Flink 中处理事件时间时,您必须安排事件有时间戳,并且流有水印.您可以通过实现时间戳提取器和水印生成器来实现,如 此处 和 此处.
When working with event time in Flink you must arrange for the events to have timestamps, and for the streams to have watermarks. You do this by implementing a timestamp extractor and watermark generator, as described here and here.
另请参阅教程.
这篇关于KeyedProcessFunction 实现抛出空指针异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!