Flink 键控流密钥为空 [英] Flink keyed stream key is null

查看:35
本文介绍了Flink 键控流密钥为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对 Flink 中的 KeyedStream 执行映射操作:

I am trying to perform a map operation on a KeyedStream in Flink:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())

JsonToObjectMapper 操作符的输出是一个MessageObject 类的POJO,它有一个字符串字段'keyfield'.然后在这个字段上输入流.

The output of the JsonToObjectMapper operator is a POJO of class MessageObject which has a String field 'keyfield'. The stream is then keyed on this field.

MessageProcessorStateful 是一个 RichMapFunction,如下所示:

The MessageProcessorStateful is a RichMapFunction like this:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}

代码抛出 NullPointer 异常:

The code throws a NullPointer exception:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

似乎 KeyedState 中 KeyedStream 之一的键为空,尽管我已验证keyfield"始终是有效字符串.根据 Flink 文档,休息似乎是正确的.知道发生了什么吗?

Seems the key in the keyedState for one of the KeyedStream is null although I have verified that the 'keyfield' is always a valid string. Rest seems to be correct as per the Flink documentation. Any idea what is going on?

推荐答案

问题在于您尝试访问 open() 方法中的键控状态.

The problem is that you try to access the keyed state in the open() method.

Keyed state 为每个键维护一个状态实例.在您的示例中,您使用的是 MapState.所以每个键都有一个 MapState 实例.访问状态时,您将始终获得与当前处理记录的键对应的状态实例.在 MapFunction(如您的示例中)中,这将是传递给 map() 方法的记录.

Keyed state maintains a state instance for each key. In your example you are using MapState. So you have one MapState instance for each key. When accessing the state, you'll always get the state instance that corresponds to the key of the currently processed record. In a MapFunction (like in your example) this would be the record that is passed to the map() method.

由于 open() 不是用记录调用的,所以 open() 中的当前键是 null 并且不可能访问键控状态.

Since open() is not called with a record, the current key in open() is null and it is not possible to access the keyed state.

这篇关于Flink 键控流密钥为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆