在多个转换中保持键控状态 [英] Keep keyed state across multiple transformations

查看:16
本文介绍了在多个转换中保持键控状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个流,我想使用某个键进行分区,然后运行多个转换,每个转换使用一个状态.当我调用 keyBy() 时,我得到一个 KeyedStream 并且下一个转换可以正确访问分区状态,但是在此之后链接的另一个转换在尝试访问分区时会出现异常状态.例外是:

I have a stream that I want to partition using a certain key, and then run through several transformations, each using a state. When I call keyBy() I get a KeyedStream and the next transformation can access a partitioned state correctly, but another transformation chained after that gets an exception when trying to access a partitioned state. The exception is:

状态键序列化器尚未在配置中配置.此操作不能使用分区状态

State key serializer has not been configured in the config. This operation cannot use partitioned state

似乎关键信息只传递给第一个转换,而不是进一步向下传递.

It seems that the key information is only passed to the first transformation and not further down the chain.

我尝试运行的代码与此代码大致相同(但实际上做了一些事情):

The code I try to run is along the lines of this code (but actually does something):

DataStream<Long> newStream = eventsStream
    .keyBy("username")
    .filter(new RichFilterFunction<Event>() {
        private ValueState<Boolean> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
        }

        @Override
        public boolean filter(Event value) throws Exception {
            return stateStore.value();
        }
    })
    .map(new RichMapFunction<Event, Long>() {
        private ValueState<Long> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
        }

        @Override
        public Long map(Event value) throws Exception {
            return Long.parseLong(value.data) + stateStore.value();
        }
    });

此代码将在第二次 getState() 调用时抛出异常.

This code will throw an exception at the second getState() call.

我可以再次调用 keyBy(),但随后我删除了链接操作的能力.我可以手动操作流图的对象以便传递关键信息,还是不支持这种链接?

I can call keyBy() again, but then I remove the ability to chain the operations. Can I manually manipulate the objects of the stream graph so that the key information is passed, or is this sort of chaining not supported?

推荐答案

你不能.

即使您第二次调用 keyBy()(或以某种方式向下游传递key-ed"信息),您也会获得一个新状态,因为一个状态与单个操作符相关联仅.

Even if you would call keyBy() a second time (or somehow pass the "key-ed" information downstream), you would get a new state because a state is associated to a single operator only.

作为解决方法,您需要将两个运算符合并为一个.

As as workaround, you need to merge both operators into one.

如果您认为此功能可能有用,请随时通过 dev@flink.apache.org 提出建议.

If you think this feature might be helpful, feel free to suggest it at dev@flink.apache.org.

这篇关于在多个转换中保持键控状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆