传递两个流以使用Flink Job中的MainStream进行操作 [英] Passing two streams to do operations with MainStreams in Flink Job

查看:68
本文介绍了传递两个流以使用Flink Job中的MainStream进行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Flink-Job中,目前,我有两个流,一个主要数据流是从Kafka主题每分钟更新一次,另一个是流(广播流),用于KeyedBroadcastProcessFunction的过程元素函数中,用于与主流数据进行某些计算.

In Flink-Job Currently, I have two streams, one main data Streams updated every minute from Kafka topic, Another Stream(Broadcast stream) which is used in the process element function of KeyedBroadcastProcessFunction for some calculations with the mainstream data.

现在我有一个新的要求,即要添加一个与其他两个流在结构上完全不同的流.

Now I have a new requirement to add one more stream which is totally different in structure from the other two streams.

1)如何传递必须处于Flink状态的第三流以及主要数据和广播状态数据一起进行计算?在keyedBroadcastProcess函数中?

1) How can I pass the third stream which must be available in Flink state to do calculations along with main data and broadcast state data? in keyedBroadcastProcess function?

2)我们可以有两个用于主要数据的广播流吗?

2) Can we have two broadcast streams for the main data?

3)加入将不起作用,因为流数据是完全不同的数据,广播,并且第三数据流的更改频率不高.它类似于主数据,与主数据流一起在计算中使用找不到任何解决方案,请提供帮助.请分享一些我可以参考的链接.

3) Joining will not work as the stream data are totally different data, Broadcast, and third data streams are not changing more often. Its something like a master data which is used in the calculations along with Main Data Stream Couldn't find any solutions yet please help. Please share some links I can refer to.

推荐答案

Flink不提供具有三种输入的任何过程函数.

Flink does not offer any sort of process function with three inputs.

您可以将两个广播流合并在一起(在广播之前).我很欣赏它们是非常不同的类型,但是您总是可以找到使它们共存的方法.您可以使用如果没有更自然的方法来统一这两种类型,则可以选择.要将两个不同的类型合并为一个流,可以执行以下操作:

You could union the two broadcast streams together (before broadcasting them). I appreciate that they are very different types, but you can always find some way to make them co-exist. You can use Either for this if there isn't a more natural way to unify the two types. To union two disparate types into a single stream, you can do something like this:

DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);

DataStream<Either<String, Integer>> stringsOnTheLeft = strings
        .map(new MapFunction<String, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(String s) throws Exception {
                return Either.Left(s);
            }
        });

DataStream<Either<String, Integer>> intsOnTheRight = ints
        .map(new MapFunction<Integer, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(Integer i) throws Exception {
                return Either.Right(i);
            }
        });

DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);

或者,如果您可以在不同的阶段将广播流应用于主流,那么可以有两个KeyedBroadcastProcessFunction函数的序列,其中一个输出馈入另一个:

Or if you can apply the broadcast streams to the main stream in separate stages, then you could have a sequence of two KeyedBroadcastProcessFunctions, with the output of one feeding into the other:

events
    .keyBy(x -> x.foo)
    .connect(broadcast1)
    .process(new process1())
    .keyBy(x -> x.foo)
    .connect(broadcast2)
    .process(new process2())

更新:

如果我们像这样合并并广播,那么任何人的更新流将更新广播状态,或者将创建一个新条目处于广播状态?

If we merge like this and broadcast, if any update comes to anyone stream will update the broadcast state or it will create a new entry in the broadcast state?

这完全在您的控制之下.广播状态始终是地图状态;我想您会选择某种简单易用的键进行操作,因此您将拥有诸如 MapState< String,Either< T1,T2>> 之类的东西.映射状态的工作方式与任何哈希图一样:如果重用键,它将替换条目;如果引入新键,则将创建一个新条目.

That's entirely under your control. Broadcast state is always map state; I imagine you'd choose some sort of straightforward key to work with, so you'd have something like MapState<String, Either<T1, T2>>. Map state works like any hashmap: if you reuse a key it will replace the entry, if you introduce a new key, it will create a new entry.

... [I]如何为[广播]流提供这些密钥的通用密钥?

... how can [I] provide a key common to these to [broadcast] streams?

键不必相同,而必须是相同类型.

The keys don't have to be the same, they just have to be of the same type.

这篇关于传递两个流以使用Flink Job中的MainStream进行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆