在Flink中加入静态和动态Kafka源 [英] Join a static and a dynamic Kafka source in Flink

查看:77
本文介绍了在Flink中加入静态和动态Kafka源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

今天,我想谈谈有关Flink的概念性话题,而不是技术性话题.

Today, I'd like to address a conceptual topic about Flink, rather than a technical.

在我们的案例中,我们确实有两个Kafka主题A和B,需要将它们结合在一起.联接应始终包括主题A的 all 元素,以及主题B的所有新元素.实现此目标有两种可能性:始终创建新消费者并开始消费从头开始创建主题A,或者将主题A的所有元素消耗掉后就保留在一个状态内.目前,技术方法是通过联接两个DataStream来进行的,这很快向我们展示了此用例的局限性,因为没有窗口的情况下就不可能联接流(足够公平).如果窗口继续前进,并且我有定期重置消费者的感觉,那么绕开Flink引入的精心设计的逻辑,话题A中的元素最终将丢失.

In our case, we do have two Kafka topics A and B, that need to be joined. The join should always include all elements from topic A, as well as all new elements from topic B. There's 2 possibilities to achieve this: always create a new consumer and start consumption of topic A from beginning, or keep all elements from topic A within a state, once consumed. Right now, the technological approach is going via joining two DataStreams, which quickly shows us its limits for this use case, as there is no possibility to join streams without a window (fair enough). Elements from topic A are eventually lost, if the window moves on and I got the feeling regularly resetting the consumer would bypass the elaborate logic introduced by Flink.

我现在正在寻找的另一种方法是使用Table API,这听起来像是最适合此工作的方法,实际上可以无限期地将所有元素保持在其状态.

The other approach I am looking towards right now, would be to use the Table API, it sounds like it's the best fit for this job and actually keeps all the elements in its state for an indefinite amount of time.

但是我的问题是:在深入介绍Table API之前,我只是想找到一种更优雅的方法,我想确定一下,这是否是此问题的最佳解决方案,或者是否有更合适的Flink我不知道的概念?

However my question: Before going into depths of the Table API, only to notice there is a more elegant way, I'd like to identify, if this is the optimal solution for this matter or if there's an even better fitting Flink concept I am not aware of?

我忘了提及:我们不使用POJO,而是保持通用性,这意味着传入的数据被标识为 Tuple2< K,V> ,其中> K,V 都是 GenericRecord 的一个实例.序列化/反序列化的相应架构是在运行时从架构注册表获取的.我不知道在这种情况下,SQL构造在多大程度上可能成为瓶颈.另外,文档两个表必须具有不同的字段名的这一点使我有点怀疑,因为我们 do 具有相同的字段名,我们必须处理这些相同的字段名.以某种方式,而没有巨大的变通办法.

I forgot to mention: We do not make use of POJOs, but rather keep it generic, which means that the incoming data is identified as Tuple2<K,V>, where K,V are each an instance of GenericRecord. The corresponding schema for Serialization/Deserialization is obtained from the Schema Registry on runtime. I don't know, to which extent the SQL constructs can be a bottleneck in this situation. Additionally, this remark from the documentation Both tables must have distinct field names makes me doubt a little bit, as we do have the same field names, which we will have to handle somehow, without having huge workarounds.

推荐答案

如果A确实是静态的,那么如果您能够以某种方式将A完全摄取到Flink状态或内存中,然后将B进行流式传输,则将花费更少的钱.A-从而产生连接结果,而不必存储B.

If A is truly static, then it will be less expensive if you can somehow fully ingest A, either into Flink state or into memory, and then stream B past A -- thereby producing the join results without having to store B.

至少有两种方法可以使用Flink完成此操作.此答案中对此进行了描述.,另一个涉及使用状态处理器API.

There are at least a couple of ways to accomplish this with Flink. One is described in this answer, and the other involves using the State Processor API.

使用第二种方法,您将A保持在键分区的Flink状态.通过使用State Processor API,您可以引导一个包含所需状态的保存点,以便通过从该保存点开始作业,A已被完全加载并立即可用.

With this second approach you would hold A in key-partitioned Flink state. By using the State Processor API you can bootstrap a savepoint that contains the state you want, so that by starting your job from this savepoint, A is already fully loaded and immediately available.

此要点中有一个简单的引导键控状态的示例.一旦创建了保存点,就需要实现一个流作业,使用该流作业来计算联接-可以通过RichFlatMapFunction来完成.

There's a simple example of bootstrapping keyed state in this gist. Once you have created the savepoint, then you need to implement a streaming job that uses it to compute the join -- which can be done with a RichFlatMapFunction.

不使用Table API来实现联接的另一种方法是简单地使用RichCoFlatMapFunction或KeyedCoProcessFunction滚动自己的联接.您可以在Flink培训中找到示例.这些示例都不符合您的要求,但是它们具有一般的风味.但是,我认为这没有任何优势-如果您要进行完全动态/动态联接,则不妨使用Table API.

The other alternative for implementing joins without using the Table API is to simply roll your own with a RichCoFlatMapFunction or a KeyedCoProcessFunction. You will find examples of this in the Flink training. None of those examples really match your requirements, but they give the general flavor. I don't see any advantage to this, however -- if you are going to do a fully dynamic/dynamic join, might as well use the Table API.

这篇关于在Flink中加入静态和动态Kafka源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆