Flink:如何存储状态并在另一个流中使用? [英] Flink: how to store state and use in another stream?

查看:583
本文介绍了Flink:如何存储状态并在另一个流中使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Flink用例,我需要从文件中读取信息,存储每一行​​,然后使用此状态过滤另一个流.

I have a use-case for Flink where I need to read information from a file, store each line, and then use this state to filter another stream.

我现在可以使用connect运算符和RichCoFlatMapFunction来完成所有这些工作,但是感觉太复杂了.另外,我担心flatMap2可能在从文件加载所有状态之前开始执行:

I have all of this working right now with the connect operator and a RichCoFlatMapFunction, but it feels overly complicated. Also, I'm concerned that flatMap2 could begin executing before all of the state is loaded from the file:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

(从Flink 1.1.3开始)是否有更好的方法来完成这种加载状态模式,然后在后续流中使用它?

Is there a better way (as of Flink 1.1.3) to accomplish this pattern of loading state, then using it in subsequent streams?

推荐答案

您对CoFlatMapFunction的担心是正确的.不能控制flatMap1flatMap2的调用顺序,并且取决于数据到达的顺序.因此,flatMap2可能会在flatMap1读取所有数据之前被调用.

Your concerns about the CoFlatMapFunction are correct. The order in which flatMap1 and flatMap2 are called cannot be controlled and depend on the order in which data arrives. So, flatMap2 might be called before all data has been read by flatMap1.

Flink 1.1.3中开始处理流之前读取所有数据的唯一方法是使用RichFlatMapFunctionopen()方法消耗数据,即,您必须手动读取和解析文件.

The only way in Flink 1.1.3 to read all data before starting to process a stream is to consume the data in the open() method of a RichFlatMapFunction, i.e., you have to manually read and parse the file.

这基本上是一种广播加入策略,即,运营商的每个并行实例都将执行此操作.缺点是将复制文件的数据.好处是您不必混洗主流"流(无需使用keyBy()).

This is basically a broadcast join strategy, i.e., each parallel instance of the operator will do this. The drawback is that the data of the file will be replicated. The benefit is that you do not have to shuffle the "main" stream (no need to use keyBy()).

这篇关于Flink:如何存储状态并在另一个流中使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆