Flink键控流键为空 [英] Flink keyed stream key is null
问题描述
我正在尝试在Flink中的KeyedStream上执行映射操作:
I am trying to perform a map operation on a KeyedStream in Flink:
stream.map(new JsonToMessageObjectMapper())
.keyBy("keyfield")
.map(new MessageProcessorStateful())
JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,该POJO具有String字段" keyfield ".然后在该字段上键入流.
The output of the JsonToObjectMapper operator is a POJO of class MessageObject which has a String field 'keyfield'. The stream is then keyed on this field.
MessageProcessorStateful是一个RichMapFunction,如下所示:
The MessageProcessorStateful is a RichMapFunction like this:
public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {
private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
...
@Override
public void open(Configuration config) throws Exception {
MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
new MapStateDescriptor<>(
"state", // the state name
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
state = getRuntimeContext().getMapState(descriptor);
state.put(...); // Insert a key, value here. Exception here!
}
}
代码抛出NullPointer异常:
The code throws a NullPointer exception:
Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
似乎KeyedStream之一的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效的字符串.根据Flink文档,休息似乎是正确的.知道发生了什么吗?
Seems the key in the keyedState for one of the KeyedStream is null although I have verified that the 'keyfield' is always a valid string. Rest seems to be correct as per the Flink documentation. Any idea what is going on?
推荐答案
问题是您尝试访问open()
方法中的键控状态.
The problem is that you try to access the keyed state in the open()
method.
键控状态为每个键维护一个状态实例.在您的示例中,您正在使用MapState
.因此,每个键都有一个MapState
实例.访问状态时,您将始终获得与当前处理记录的键相对应的状态实例.在MapFunction
中(如您的示例),这将是传递给map()
方法的记录.
Keyed state maintains a state instance for each key. In your example you are using MapState
. So you have one MapState
instance for each key. When accessing the state, you'll always get the state instance that corresponds to the key of the currently processed record. In a MapFunction
(like in your example) this would be the record that is passed to the map()
method.
由于未通过记录调用open()
,因此open()
中的当前键为null
,因此无法访问键控状态.
Since open()
is not called with a record, the current key in open()
is null
and it is not possible to access the keyed state.
这篇关于Flink键控流键为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!