如何选择一个Kafka transaction.id [英] How to pick a Kafka transaction.id

查看:42
本文介绍了如何选择一个Kafka transaction.id的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id.这是上下文:

I wonder could I get some help understanding transactions in Kafka and in particular how I use transaction.id. Here's the context:

  1. 我的Kafka应用程序遵循以下模式:消耗来自输入主题的消息,进行处理,然后发布至输出主题.
  2. 我不使用Kafka Streams API.
  3. 我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中.
  4. 有一个带有工作线程的线程池,该线程池进行消息处理并将其发布到输出主题.目前,每个线程都有自己的生产者实例.
  5. 我正在使用已发布的交易API,以确保消耗偏移量的更新和对输出主题的发布是原子发生的

到目前为止,我的假设包括:

My assumptions to date have included:

  1. 如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗消耗.因此,在重新启动时,我只需从原始的消耗偏移量再次启动事务即可.
  2. 对于生产者transaction.id,最重要的是它是唯一的.因此,我可以在启动时生成基于时间戳的ID

然后,我阅读了以下博客: https://www.confluent.io/blog/transactions-apache-kafka/.特别是在如何选择交易ID"部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例.它说:正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写周期中的输入主题和分区始终相同."它还进一步列举了以下问题示例:例如,在分布式流处理应用程序中,假设主题分区tp0最初是由transactional.id T0处理的.如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会建立隔离.因此,有可能对tp0中的消息进行重新处理,这完全违反了一次处理保证."

Then I read the following blog: https://www.confluent.io/blog/transactions-apache-kafka/. In particular in the section "How to pick a transaction id" it seems to imply that I need to guarantee that a producer instance per input partition. It says "The key to fencing out zombies properly is to ensure that the input topics and partitions in the read-process-write cycle is always the same for a given transactional.id.". It further cites the problem example as follows: "For instance, in a distributed stream processing application, suppose topic-partition tp0 was originally processed by transactional.id T0. If, at some point later, it could be mapped to another producer with transactional.id T1, there would be no fencing between T0 and T1. So it is possible for messages from tp0 to be reprocessed, violating the exactly once processing guarantee."

我不太明白为什么会这样.在我看来,只要事务是原子性的,我就不在乎哪个生产者处理来自任何分区的消息.我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情.因此,为什么我不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的.为什么他们说如果执行此操作,消息可能会通过事务提供的防护范围泄漏.

I can't quite understand why this is the case. To my mind, I shouldn't care what producer handles messages from any partition as long as transactions are atomic. I've been struggling with this for a day and I wonder if someone could tell me what I've missed here. So, why can't I assign work to any producer instance with any transaction.id setting as long as it is unique. And why do they say that messages can leak through the fencing provided by transactions if you do this.

推荐答案

您提到的博客文章包含了您正在寻找的所有信息,尽管内容非常密集.

The blog article you mentioned has all the information you're looking for, although it's rather dense.

摘自前述文章.

使用为至少一次传递语义配置的香草Kafka生产者和消费者,流处理应用程序一旦按以下方式处理语义,就可能完全丢失:

Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

  1. 由于内部重试, producer.send()可能导致消息B的重复写入.这由幂等的生产者解决,而不是本文其余部分的重点.

  1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

我们可能会重新处理输入消息A,导致重复的B消息被写入输出,这完全违反了一次处理语义.如果流处理应用程序在写入B之后但将A标记为已使用之前崩溃,则可能会发生重新处理.因此,当恢复时,它将再次消耗A并再次写入B,从而导致重复.

We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

最后,在分布式环境中,应用程序将崩溃或更糟!暂时失去与系统其余部分的连接.通常,新实例会自动启动以替换被认为丢失的实例.通过此过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,从而导致输出重复,并且违反了一次处理语义的方式.我们称其为僵尸实例"问题. [强调加入]

来自同一文章的交易语义部分.

From the Transactional Semantics section in same article.

僵尸围栏

我们通过要求为每个交易产生者分配一个唯一的标识符(称为 transactional.id )来解决僵尸实例的问题.这用于在流程重新启动时标识相同的生产者实例. [已添加重点]

We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts. [emphasis added]

API要求事务生产方的第一个操作应该是在Kafka群集中显式注册其 transactional.id .当这样做时,Kafka经纪人将使用给定的 transactional.id 检查未结交易并完成交易.它还会增加与 transactional.id 关联的纪元.时期是为每个 transactional.id 存储的内部元数据片段.

The API requires that the first operation of a transactional producer should be to explicitly register its transactional.id with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the given transactional.id and completes them. It also increments an epoch associated with the transactional.id. The epoch is an internal piece of metadata stored for every transactional.id.

一旦纪元被颠覆,具有相同 transactional.id 和较旧纪元的生产者将被视为僵尸并被围起来.那些生产者将来的交易性写法将被拒绝. [强调的内容]

Once the epoch is bumped, any producers with same transactional.id and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected. [emphasis added]

同一文章中的数据流部分.

A:生产者和交易协调者之间的互动

执行交易时,生产者在以下几点向交易协调员发出请求:

When executing transactions, the producer makes requests to the transaction coordinator at the following points:

  1. initTransactions API向协调器注册 transactional.id .此时,协调器将使用该 transactional.id 关闭所有待处理的事务,并冲破纪元以掩盖僵尸.每个生产者会话只会发生一次. [已添加重点]

  1. The initTransactions API registers a transactional.id with the coordinator. At this point, the coordinator closes any pending transactions with that transactional.id and bumps the epoch to fence out zombies. This happens only once per producer session. [emphasis added]

当生产者要在事务中首次将数据发送到分区时,首先要向协调器注册该分区.

When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.

当应用程序调用 commitTransaction abortTransaction 时,会将请求发送到协调器以开始两阶段提交协议.

When the application calls commitTransaction or abortTransaction, a request is sent to the coordinator to begin the two phase commit protocol.

希望这会有所帮助!

这篇关于如何选择一个Kafka transaction.id的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆