具有依赖对象的Kafka Streams等待功能 [英] Kafka Streams wait function with depending objects

查看:20
本文介绍了具有依赖对象的Kafka Streams等待功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个 Kafka Streams 应用程序,它接收来自不同主题的不同 JSON 对象,我想实现某种等待功能,但我不确定如何最好地实现它.

I create a Kafka Streams application, which receives different JSON objects from different topics and I want to implement some kind of wait function, but I'm not sure about how to implement it best.

为了简化问题,我将在下一节中使用简化的实体,我希望可以很好地描述问题.所以在我的一个流中,我收到了汽车对象,每辆车都有一个 id.在第二个流中,我收到 person 对象,每个人也有一个汽车 ID,并被分配给具有此 ID 的汽车.

To simplify the problem I'll use simplified entities in the following section, I hope the problem can be described very good with it. So in one of my streams I receive car objects and every car has an id. In a second stream I receive person objects and every person has also a car id and is assigned to a car with this id.

我想使用我的 Kafka Streams 应用程序从两个输入流(主题)中读取数据,并使用具有相同汽车 ID 的四个人来丰富汽车对象.只有当所有四个人都包含在汽车对象中时,汽车对象才应该被转发到下一个下游处理器.

I want to read with my Kafka Streams application from both input streams (topics) and enrich the car object with the four persons, which have the same car id. The car objects should only be forwarded to the next downstream processor when all four persons are included into the car object.

我计划为汽车创建一个输入流,为人物对象创建一个输入流,将 JSON 数据解析为内部对象表示,将两个流合并在一起并在合并的流上应用selectKey"函数以提取键出实体.之后,我会将数据推送到自定义转换函数中,该函数包含一个状态存储.在这个转换函数中,我会将每个到达的汽车对象及其 ID 存储在状态存储中.一旦新的 person 对象到达,我就会将它们添加到 state store 中相应的 car 对象中(请忽略此处迟到的汽车的情况).只要有四个人在汽车对象中,我就会将该对象转发到下一个流函数,并将汽车对象从状态存储中移除.

I have planned to create an input stream for the car and one for the person objects, parse the JSON data into the internal object representation, merge both streams together and apply a "selectKey" function on the merged stream to extract the keys out of the entities. After that I would push the data into a custom transformation function, which has a state store inlcuded. Inside this transform function I would store every arriving car object with its id in the state store. As soon as new person objects arrive, I would add them to the respective car object in the state store (please ignore the case of late arriving cars here). As soon as four persons are in a car object, I would forward the object to the next stream function and remove the car object out of the state store.

这是一个合适的方法吗?我不确定可扩展性,因为我必须确保在运行多个实例时,具有相同 id 的 car 和 person 对象将由同一个应用程序实例处理.我会为此使用 selectKey 函数,这行得通吗?

Would this be a suitable approach for this? I'm not sure about scalability, because I have to make sure that when running multiple instances that the car and person objects with the same id will be processed by the same application instance. I would use the selectKey function for this, would that work?

谢谢!

推荐答案

基本设计在我看来很合理.

The basic design looks sound to me.

然而,selectKey() 本身是不够的,因为 transform()(与 DSL 操作符相反)不会触发自动重新平衡.因此,您需要通过 through() 手动重新平衡.

However, selectKey() itself will not be sufficient, because transform() (in contrast to DSL operators) does not trigger an auto-rebalance. Thus, you need to manually rebalance via through().

stream.selectKey(...)
      .through("user-created-topic")
      .transform(...);

https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning

这篇关于具有依赖对象的Kafka Streams等待功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆