Kafka Processor API:Source和StateStore的不同密钥? [英] Kafka Processor API: Different key for Source and StateStore?

查看:87
本文介绍了Kafka Processor API:Source和StateStore的不同密钥?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们当前正在实施一个流程(使用Kafka Processor API),因为我们需要将有关一个主题的2个相关事件(消息)的信息进行组合,然后转发这些组合的信息.这些事件起源于IoT设备,并且由于我们希望使它们保持秩序,因此源主题使用设备标识符作为键.这些事件还包含一个关联ID:

we are currently implementing a process (using the Kafka Processor API) were we need to combine information from 2 correlated events (messages) on a topic and then forward those combined information. The events originate from IoT devices and since we want to keep them in order, the source topic uses a device identifier as key. The events also contain a correlation ID:

{ deviceId: "..." }

消息

{ deviceId: "...", correlationId: "...", data: ...}

我们的第一种方法是创建一个具有连接的状态存储的处理器,该状态存储使用相关ID作为键来存储每个传入的消息.这样一来,我们就可以在商店中查询传入消息的相关性ID,如果商店中已经存在具有相同ID的消息,则可以合并信息,转发新事件并从商店中删除条目.因此,对于每个相关性ID,都会发生以下情况:在某个时候,具有该ID的第一条消息将被使用和存储,而在另一时间,具有该ID的第二条消息将导致商店条目被删除.

Our first approach was to create a Processor that has a connected State Store, which stores every incoming message, using the correlation ID as key. That enables us to query the store for the correlation ID of an incoming message, and if there already is a message with the same ID in the store, we can combine the information, forward a new event and remove the entry from the store. So for every correlation ID the following happens: at some point the first message with that ID is consumed and stored and at some other point in the time the second message with that ID results in the store entry being removed.

状态键

{ correlationId: "..." }

状态值

{ event: { deviceId: "...", correlationId: "...", data: ... }}

但是现在我们想知道Kafka Streams如何处理不同的密钥.我们正在使用微服务方法,并且该服务将有多个实例在运行.该商店自动由内部主题支持.考虑重新缩放服务实例,s.t.源主题和状态主题的分区将重新平衡.是否可以将用于特定关联ID的分区分配给除对应设备ID的分区之外的另一服务?难道我们会遇到这样的情况:具有相同相关性ID的第二个事件将被无法访问已存储的第一个事件的服务实例消耗吗?

But now we are wondering how Kafka Streams is handling the different keys. We are using a Microservice approach and there will be multiple instances of that service running. The store is automatically backed by an internal topic. Consider re-scaling the service instances, s.t. the partitions of the source topic and the state topic are re-balanced. Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID? Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

提前谢谢!

推荐答案

如果我正确理解了您的设置,那么可以,这种方法是正确的,并且(重新)缩放也可以使用.

If I understand your setup correctly, then yes, the approach is correct and (re)scaling will just work.

TL; DR:如果将流任务从计算机A移至计算机B,那么其所有状态也将被移动,无论该状态如何被键控(在您的情况下,它恰巧由correlationId键控) ).

TL;DR: If a stream task is moved from machine A to machine B, then all its state will be moved as well, regardless of how that state is keyed (in your case it happens to be keyed by correlationId).

详细信息:

  • Kafka Streams将处理工作分配给 流任务 .这是通过基于输入分区中的消息键(以您的情况:用deviceId键)确定性地将输入分区映射到流任务的方式而发生的.这样可以确保即使在计算机/VM/容器之间移动流任务时,它们也将始终看到其"输入分区=输入数据.
  • 流任务基本上由实际的处理逻辑(在您的情况下:处理器API代码)和任何相关的状态(在您的情况下:一个由correlationId键控的状态存储).对您的问题重要的是,状态的键控无关紧要.输入分区的键是唯一重要的,因为这确定了哪些数据从输入主题流到特定的流任务(请参见上一个要点).在计算机/VM/容器之间移动流任务时,其所有状态也会同时移动,因此它始终具有自己的"状态.
  • Kafka Streams assigns processing work to stream tasks. This happens by mapping input partitions to stream tasks in a deterministic manner, based on the message key in the input partitions (in your case: keyed by deviceId). This ensures that, even when stream tasks are being moved across machines/VMs/containers, they will always see "their" input partitions = their input data.
  • A stream tasks consists, essentially, of the actual processing logic (in your case: the Processor API code) and any associated state (in your case: you have one state store that is keyed by correlationId). What's important for your question is that it does not matter how the state is keyed. It's only important how the input partitions are keyed, because that determines which data flows from the input topic to a specific stream task (see previous bullet point). When a stream task is being moved across machines/VM/containers, all its state will be moved as well so that it always has "its own" state available.

商店会自动受到内部主题的支持.

The store is automatically backed by an internal topic.

正如您已经建议的那样,存储具有内部主题(用于容错和弹性缩放,因为该内部主题用于在将其流任务从A移到B时重建状态存储)是一个事实.实现细节.作为使用Kafka Streams API的开发人员,状态存储恢复的处理将自动为您透明地完成.

As you already suggested, the fact that a store has an internal topic (for fault-tolerance and for elastic scaling, because that internal topic is used to reconstruct a state store when its stream task was moved from A to B) is an implementation detail. As a developer using the Kafka Streams API, the handling of state store recovery is automagically and transparently done for you.

当移动流任务及其状态存储时,Kafka Streams知道如何需要在流任务的新位置重建状态存储.您不必为此担心.

When a stream task is being moved, and thus its state store(s), then Kafka Streams knows how it needs to reconstruct the state store(s) at the new location of the stream task. You don't need to worry about that.

特定关联ID的分区是否有可能被分配给另一服务,而不是对应设备ID的分区?

Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID?

否(很好).流任务将始终知道如何重建其状态(1个以上的状态存储),而不管该状态本身如何被设置密钥.

No (which is good). A stream task will always know how to reconstruct its state (1+ state stores), regardless of how that state itself is keyed.

我们是否最终会遇到这样的情况:具有相同关联ID的第二个事件将被无法访问已存储的第一个事件的服务实例消耗?

Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

没有(很好).

这篇关于Kafka Processor API:Source和StateStore的不同密钥?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆