Flink键控流键为空 [英] Flink keyed stream key is null

查看:763
本文介绍了Flink键控流键为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Flink中的KeyedStream上执行映射操作:

I am trying to perform a map operation on a KeyedStream in Flink:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())

JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,该POJO具有String字段" keyfield ".然后在该字段上键入流.

The output of the JsonToObjectMapper operator is a POJO of class MessageObject which has a String field 'keyfield'. The stream is then keyed on this field.

MessageProcessorStateful是一个RichMapFunction,如下所示:

The MessageProcessorStateful is a RichMapFunction like this:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}

代码抛出NullPointer异常:

The code throws a NullPointer exception:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

似乎KeyedStream之一的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效的字符串.根据Flink文档,休息似乎是正确的.知道发生了什么吗?

Seems the key in the keyedState for one of the KeyedStream is null although I have verified that the 'keyfield' is always a valid string. Rest seems to be correct as per the Flink documentation. Any idea what is going on?

推荐答案

问题是您尝试访问open()方法中的键控状态.

The problem is that you try to access the keyed state in the open() method.

键控状态为每个键维护一个状态实例.在您的示例中,您正在使用MapState.因此,每个键都有一个MapState实例.访问状态时,您将始终获得与当前处理记录的键相对应的状态实例.在MapFunction中(如您的示例),这将是传递给map()方法的记录.

Keyed state maintains a state instance for each key. In your example you are using MapState. So you have one MapState instance for each key. When accessing the state, you'll always get the state instance that corresponds to the key of the currently processed record. In a MapFunction (like in your example) this would be the record that is passed to the map() method.

由于未通过记录调用open(),因此open()中的当前键为null,因此无法访问键控状态.

Since open() is not called with a record, the current key in open() is null and it is not possible to access the keyed state.

这篇关于Flink键控流键为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆