Kafka Streams等待功能与依赖对象 [英] Kafka Streams wait function with depending objects

查看:117
本文介绍了Kafka Streams等待功能与依赖对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个Kafka Streams应用程序,该应用程序从不同的主题接收不同的JSON对象,并且我想实现某种等待功能,但是我不确定如何最好地实现它.

I create a Kafka Streams application, which receives different JSON objects from different topics and I want to implement some kind of wait function, but I'm not sure about how to implement it best.

为简化问题,我将在下一节中使用简化的实体,希望该问题可以很好地描述. 因此,在我的其中一个流中,我收到了汽车物品,每辆汽车都有一个ID.在第二个流中,我收到人对象,并且每个人都有一个汽车ID,并分配给具有该ID的汽车.

To simplify the problem I'll use simplified entities in the following section, I hope the problem can be described very good with it. So in one of my streams I receive car objects and every car has an id. In a second stream I receive person objects and every person has also a car id and is assigned to a car with this id.

我想用我的Kafka Streams应用程序从两个输入流(主题)中读取内容,并用具有相同汽车ID的四个人丰富汽车对象.仅当所有四个人都包含在汽车对象中时,才可以将汽车对象转发到下一个下游处理器.

I want to read with my Kafka Streams application from both input streams (topics) and enrich the car object with the four persons, which have the same car id. The car objects should only be forwarded to the next downstream processor when all four persons are included into the car object.

我计划为汽车创建一个输入流,为人对象创建一个输入流,将JSON数据解析为内部对象表示形式,将两个流合并在一起,并在合并的流上应用"selectKey"函数以提取键在实体之外. 之后,我将数据推送到自定义转换函数中,该函数具有一个状态存储.在此转换函数中,我会将每个到达的汽车对象及其ID存储在状态存储中.一旦有新的人对象到达,我就将它们添加到状态存储中的相应汽车对象中(请忽略此处迟到的汽车的情况).一旦有四个人在汽车对象中,我就将该对象转发到下一个流函数,并将汽车对象从状态存储中移出.

I have planned to create an input stream for the car and one for the person objects, parse the JSON data into the internal object representation, merge both streams together and apply a "selectKey" function on the merged stream to extract the keys out of the entities. After that I would push the data into a custom transformation function, which has a state store inlcuded. Inside this transform function I would store every arriving car object with its id in the state store. As soon as new person objects arrive, I would add them to the respective car object in the state store (please ignore the case of late arriving cars here). As soon as four persons are in a car object, I would forward the object to the next stream function and remove the car object out of the state store.

这将是合适的方法吗?我不确定可伸缩性,因为我必须确保在运行多个实例时,具有相同ID的car和person对象将由同一应用程序实例处理.我将为此使用selectKey函数,行得通吗?

Would this be a suitable approach for this? I'm not sure about scalability, because I have to make sure that when running multiple instances that the car and person objects with the same id will be processed by the same application instance. I would use the selectKey function for this, would that work?

谢谢!

推荐答案

基本设计对我来说很合理.

The basic design looks sound to me.

但是,selectKey()本身是不够的,因为transform()(与DSL运营商相比)不会触发自动重新平衡.因此,您需要通过through()手动重新平衡.

However, selectKey() itself will not be sufficient, because transform() (in contrast to DSL operators) does not trigger an auto-rebalance. Thus, you need to manually rebalance via through().

stream.selectKey(...)
      .through("user-created-topic")
      .transform(...);

https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning

这篇关于Kafka Streams等待功能与依赖对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆