Kafka Processor API:Source 和 StateStore 的不同键? [英] Kafka Processor API: Different key for Source and StateStore?

查看:25
本文介绍了Kafka Processor API:Source 和 StateStore 的不同键?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们目前正在实施一个流程(使用 Kafka 处理器 API),我们需要将来自主题的 2 个相关事件(消息)的信息组合起来,然后转发这些组合信息.事件源自 IoT 设备,并且由于我们希望将它们保持有序,因此源主题使用设备标识符作为键.这些事件还包含一个相关 ID:

we are currently implementing a process (using the Kafka Processor API) were we need to combine information from 2 correlated events (messages) on a topic and then forward those combined information. The events originate from IoT devices and since we want to keep them in order, the source topic uses a device identifier as key. The events also contain a correlation ID:

{ deviceId: "..." }

留言

{ deviceId: "...", correlationId: "...", data: ...}

我们的第一种方法是创建一个具有连接状态存储的处理器,该状态存储使用相关 ID 作为键存储每条传入消息.这使我们能够在存储中查询传入消息的相关 ID,如果存储中已经存在具有相同 ID 的消息,我们可以组合这些信息,转发新事件并从存储中删除条目.因此,对于每个关联 ID,都会发生以下情况:在某个时间点,使用并存储具有该 ID 的第一条消息,而在其他时间点,具有该 ID 的第二条消息导致存储条目被删除.

Our first approach was to create a Processor that has a connected State Store, which stores every incoming message, using the correlation ID as key. That enables us to query the store for the correlation ID of an incoming message, and if there already is a message with the same ID in the store, we can combine the information, forward a new event and remove the entry from the store. So for every correlation ID the following happens: at some point the first message with that ID is consumed and stored and at some other point in the time the second message with that ID results in the store entry being removed.

状态键

{ correlationId: "..." }

状态值

{ event: { deviceId: "...", correlationId: "...", data: ... }}

但现在我们想知道 Kafka Streams 如何处理不同的密钥.我们正在使用微服务方法,并且该服务将运行多个实例.该商店由内部主题自动支持.考虑重新缩放服务实例,s.t.重新平衡源主题和状态主题的分区.是否有可能将特定关联 ID 的分区分配给其他服务而不是相应设备 ID 的分区?我们是否会出现这样的情况:具有相同关联 ID 的第二个事件将被服务实例使用,而该服务实例无权访问已存储的第一个事件?

But now we are wondering how Kafka Streams is handling the different keys. We are using a Microservice approach and there will be multiple instances of that service running. The store is automatically backed by an internal topic. Consider re-scaling the service instances, s.t. the partitions of the source topic and the state topic are re-balanced. Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID? Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

提前致谢!

推荐答案

如果我正确理解您的设置,那么是的,该方法是正确的,并且(重新)缩放将起作用.

If I understand your setup correctly, then yes, the approach is correct and (re)scaling will just work.

TL;DR:如果一个流任务从机器 A 移动到机器 B,那么它的所有状态也将被移动,无论该状态如何键控(在您的情况下它恰好由 键控)correlationId).

TL;DR: If a stream task is moved from machine A to machine B, then all its state will be moved as well, regardless of how that state is keyed (in your case it happens to be keyed by correlationId).

更详细:

  • Kafka Streams 将处理工作分配给流任务.这是通过基于输入分区中的消息键(在您的情况下:由 deviceId 键控)以确定性方式将输入分区映射到流任务来实现的.这确保了即使流任务在机器/虚拟机/容器之间移动,他们也将始终看到他们的"输入分区 = 他们的输入数据.
  • 流任务本质上由实际处理逻辑(在您的情况下:处理器 API 代码)和任何关联的状态(在您的情况下:您有一个由 correlationId 键控的状态存储).对您的问题而言,重要的是状态的键控方式无关紧要.输入分区的键控方式仅重要,因为这决定了哪些数据从输入主题流向特定的流任务(请参阅上一个要点).当流任务在机器/虚拟机/容器之间移动时,它的所有状态也将被移动,以便它始终有自己的"可用状态.
  • Kafka Streams assigns processing work to stream tasks. This happens by mapping input partitions to stream tasks in a deterministic manner, based on the message key in the input partitions (in your case: keyed by deviceId). This ensures that, even when stream tasks are being moved across machines/VMs/containers, they will always see "their" input partitions = their input data.
  • A stream tasks consists, essentially, of the actual processing logic (in your case: the Processor API code) and any associated state (in your case: you have one state store that is keyed by correlationId). What's important for your question is that it does not matter how the state is keyed. It's only important how the input partitions are keyed, because that determines which data flows from the input topic to a specific stream task (see previous bullet point). When a stream task is being moved across machines/VM/containers, all its state will be moved as well so that it always has "its own" state available.

商店由内部主题自动支持.

The store is automatically backed by an internal topic.

正如您已经建议的那样,存储具有内部主题(用于容错和弹性扩展,因为该内部主题用于在其流任务从 A 移至 B 时重建状态存储)这一事实是一个实现细节.作为使用 Kafka Streams API 的开发人员,状态存储恢复的处理会自动且透明地为您完成.

As you already suggested, the fact that a store has an internal topic (for fault-tolerance and for elastic scaling, because that internal topic is used to reconstruct a state store when its stream task was moved from A to B) is an implementation detail. As a developer using the Kafka Streams API, the handling of state store recovery is automagically and transparently done for you.

当一个流任务被移动,因此它的状态存储被移动时,Kafka Streams 知道它需要如何在流任务的新位置重建状态存储.您无需担心.

When a stream task is being moved, and thus its state store(s), then Kafka Streams knows how it needs to reconstruct the state store(s) at the new location of the stream task. You don't need to worry about that.

是否有可能将特定关联 ID 的分区分配给相应设备 ID 的分区以外的其他服务?

Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID?

不(这很好).流任务将始终知道如何重建其状态(1+ 状态存储),而不管该状态本身是如何键控的.

No (which is good). A stream task will always know how to reconstruct its state (1+ state stores), regardless of how that state itself is keyed.

我们是否会遇到以下情况:具有相同关联 ID 的第二个事件将被服务实例使用,而该服务实例无权访问已存储的第一个事件?

Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

没有(这很好).

这篇关于Kafka Processor API:Source 和 StateStore 的不同键?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆