Flink:如何存储状态并在另一个流中使用? [英] Flink: how to store state and use in another stream?

查看:20
本文介绍了Flink:如何存储状态并在另一个流中使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Flink 用例,我需要从文件中读取信息,存储每一行​​,然后使用此状态过滤另一个流.

I have a use-case for Flink where I need to read information from a file, store each line, and then use this state to filter another stream.

我现在使用 connect 运算符和 RichCoFlatMapFunction 可以完成所有这些工作,但感觉过于复杂.另外,我担心 flatMap2 可能会在从文件加载所有状态之前开始执行:

I have all of this working right now with the connect operator and a RichCoFlatMapFunction, but it feels overly complicated. Also, I'm concerned that flatMap2 could begin executing before all of the state is loaded from the file:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

有没有更好的方法(从 Flink 1.1.3 开始)来实现这种加载状态模式,然后在后续流中使用它?

Is there a better way (as of Flink 1.1.3) to accomplish this pattern of loading state, then using it in subsequent streams?

推荐答案

您对 CoFlatMapFunction 的担忧是正确的.flatMap1flatMap2 的调用顺序无法控制,取决于数据到达的顺序.因此,flatMap2 可能会在 flatMap1 读取所有数据之前调用.

Your concerns about the CoFlatMapFunction are correct. The order in which flatMap1 and flatMap2 are called cannot be controlled and depend on the order in which data arrives. So, flatMap2 might be called before all data has been read by flatMap1.

在 Flink 1.1.3 开始处理流之前读取所有数据的唯一方法是在 RichFlatMapFunctionopen() 方法中消费数据,即,您必须手动读取和解析文件.

The only way in Flink 1.1.3 to read all data before starting to process a stream is to consume the data in the open() method of a RichFlatMapFunction, i.e., you have to manually read and parse the file.

这基本上是一个广播连接策略,即操作符的每个并行实例都会这样做.缺点是文件的数据会被复制.好处是您不必对主"流进行混洗(无需使用 keyBy()).

This is basically a broadcast join strategy, i.e., each parallel instance of the operator will do this. The drawback is that the data of the file will be replicated. The benefit is that you do not have to shuffle the "main" stream (no need to use keyBy()).

这篇关于Flink:如何存储状态并在另一个流中使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆