Flink 键控流密钥为空 [英] Flink keyed stream key is null
问题描述
我正在尝试对 Flink 中的 KeyedStream 执行映射操作:
I am trying to perform a map operation on a KeyedStream in Flink:
stream.map(new JsonToMessageObjectMapper())
.keyBy("keyfield")
.map(new MessageProcessorStateful())
JsonToObjectMapper 操作符的输出是一个MessageObject 类的POJO,它有一个字符串字段'keyfield'.然后在这个字段上输入流.
The output of the JsonToObjectMapper operator is a POJO of class MessageObject which has a String field 'keyfield'. The stream is then keyed on this field.
MessageProcessorStateful 是一个 RichMapFunction,如下所示:
The MessageProcessorStateful is a RichMapFunction like this:
public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {
private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
...
@Override
public void open(Configuration config) throws Exception {
MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
new MapStateDescriptor<>(
"state", // the state name
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
state = getRuntimeContext().getMapState(descriptor);
state.put(...); // Insert a key, value here. Exception here!
}
}
代码抛出 NullPointer 异常:
The code throws a NullPointer exception:
Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
似乎 KeyedState 中 KeyedStream 之一的键为空,尽管我已验证keyfield"始终是有效字符串.根据 Flink 文档,休息似乎是正确的.知道发生了什么吗?
Seems the key in the keyedState for one of the KeyedStream is null although I have verified that the 'keyfield' is always a valid string. Rest seems to be correct as per the Flink documentation. Any idea what is going on?
推荐答案
问题在于您尝试访问 open()
方法中的键控状态.
The problem is that you try to access the keyed state in the open()
method.
Keyed state 为每个键维护一个状态实例.在您的示例中,您使用的是 MapState
.所以每个键都有一个 MapState
实例.访问状态时,您将始终获得与当前处理记录的键对应的状态实例.在 MapFunction
(如您的示例中)中,这将是传递给 map()
方法的记录.
Keyed state maintains a state instance for each key. In your example you are using MapState
. So you have one MapState
instance for each key. When accessing the state, you'll always get the state instance that corresponds to the key of the currently processed record. In a MapFunction
(like in your example) this would be the record that is passed to the map()
method.
由于 open()
不是用记录调用的,所以 open()
中的当前键是 null
并且不可能访问键控状态.
Since open()
is not called with a record, the current key in open()
is null
and it is not possible to access the keyed state.
这篇关于Flink 键控流密钥为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!