使用幂等 Kafka Producer 时的排序保证 [英] Ordering guarantees when using idempotent Kafka Producer

查看:32
本文介绍了使用幂等 Kafka Producer 时的排序保证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的应用程序中使用了 Kafka 1.0.1,并且我已经开始使用 0.11 中引入的幂等生产者功能,但我在使用幂等功能时无法理解排序保证.

我的生产者的配置是:

enable.idempotence = true

max.in.flight.requests.per.connection = 5

重试 = 50

acks = all

根据文档:

重试

<块引用>

设置一个大于零的值将导致客户端重新发送任何发送失败并可能出现暂时性错误的记录.请注意,此重试与客户端在收到错误后重新发送记录的情况没有什么不同.允许重试而不将 max.in.flight.requests.per.connection 设置为 1 可能会更改记录的顺序,因为如果将两个批次发送到单个分区,第一个失败并重试但第二个成功,则记录第二批可能会先出现.

enable.idempotence

<块引用>

当设置为true"时,生产者将确保每条消息的副本都准确地写入流中.如果为false",则生产者由于代理失败等而重试,可能会在流中写入重试消息的副本.请注意,启用幂等性要求 max.in.flight.requests.per.connection 小于或等于 5,重试次数大于 0 并且 acks 必须为全部".如果用户未明确设置这些值,则会选择合适的值.如果设置了不兼容的值,则会抛出 ConfigException.

我的配置好像符合要求,但是好像不对齐.

另一个与 OutOfOrderSequenceException:根据文档,如果我收到此异常,则意味着生产者有出现故障的风险.但是,如果我的生产者配置了 max.in.flight.requests.per.connection = 5 并且假设第二个请求出现乱序异常,那么以下所有请求会发生什么已经在飞行中?这是否意味着我肯定出了问题?

解决方案

在 KafkaProducer 中启用幂等性时保证排序.

即使 max.in.flight.requests.per.connection 大于或等于 1,幂等的 KafkaProducer 仍将确保 TopicPartition 内的排序.关于重试"的描述与 max.in.flight 的关系仅适用于禁用幂等性.

一个幂等的 KafkaProducer 使用内部递增的序列号来确保最多 5 个 in.flight 请求的排序,如 Pull-Request #3743:

<块引用>

[...] 我们保留 5 个较旧批次的记录元数据."

此外,enable.idempotence 上的文档告知了最多具有5 飞行请求.否则你会得到一个 ConfigException.

更多详细信息在 KAFKA-5494 和相应文档中提供为 max.in.flight 设计 >1 启用幂等性.其中第 5 步和第 6 步将澄清您的问题:

在上述假设到位的情况下,解决方案如下:

  1. 我们跟踪发送到分区的最后一个确认序列.这会在每次成功确认时更新,因此应始终增加.

  2. 我们会跟踪给定分区的批量绑定的下一个序列.

  3. 当批次排空时,我们分配了 nextSequence.我们还将 nextSequence 增加了一个批次的记录数.

  4. 如果生产请求成功,我们将最后一个确认序列设置为批次的最后一个序列.

  5. 如果生产请求失败,成功的飞行批次也将失败,并出现 OutOfOrderSequenceException.

  6. 因此,如果一个批次的序列号不是最后一个确认序列的后继序列,并且它因 OutOfOrderSequenceException 而失败,我们认为这是可重试的.

  7. 当一个批次重新排队时,我们会在将其插入队列之前删除生产者 ID 和序列号.

  8. 当第一个飞行批次失败(无论出于何种原因)时,我们将 nextSequence 重置为 lastAckdSequence + 1.

  9. 因此,如果一个批次严重失败,则后续批次的序列号在重试时会有所不同.这很好,因为之前的失败是 OutOfSequence 异常,这绝对意味着请求被拒绝.

<块引用>

"[...] 如果我的生产者配置为 max.in.flight.requests.per.connection = 5 并且假设第二个请求出现乱序异常,以下所有情况会发生什么已经在进行中的请求?这是否意味着我肯定出了问题?"

如第 5 步和第 6 步所述,所有成功的动态请求也将失败,并显示 retriable OutOfOrderSequenceExcpetion.由于 retries 大于 0,幂等的 KafkaProducer 将能够保持顺序.

I am using Kafka 1.0.1 in my application and I have started using the Idempotent Producer feature that was introduced in 0.11, and I've having trouble understanding the ordering guarantees when using the Idempontent feature.

My producer's configuration is:

enable.idempotence = true

max.in.flight.requests.per.connection = 5

retries = 50

acks = all

According to the documentation:

retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

enable.idempotence

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.

My configuration seems to be according to the requirements, but they don't seem to align.

Another question I have has to do with the OutOfOrderSequenceException: According to the documentation, if I get this exception it means that the producer is in risk of becoming out of order. But if my producer is configured with max.in.flight.requests.per.connection = 5 and let's say that the second request got the out of order exception, what happens to all of the following requests that are already in flight? will this mean that I am for sure out of order?

解决方案

The ordering is guaranteed when enabling idempotence in the KafkaProducer.

Even if you have max.in.flight.requests.per.connection larger or equal to 1 an idempotent KafkaProducer will still ensure the ordering within a TopicPartition. The description on the "retries" with the relation to the max.in.flight is only applicable if idempotence is disabled.

An idempotent KafkaProducer uses an internal incrementing sequence number to ensure the ordering up to 5 in.flight requests as described in the Pull-Request #3743:

"[...] we retain the record metadata for 5 older batches."

Also, the documentation on the enable.idempotence informs about having at maximum 5 in-flight request. Otherwise you would get a ConfigException.

More details are given in KAFKA-5494 and the corresponding documentation on Design for max.in.flight > 1 with idempotence enabled. where the steps 5 and 6 will clarify your question:

With the above assumptions in place, the solution is as follows:

  1. We keep track of the last acknowledged sequence sent to a partition. This is updated on every successful ack and thus should always be increasing.

  2. We keep track of the next sequence for a batch bound for a given partition.

  3. We assigned the nextSequence when the batch is drained. We also increment the nextSequence by the record count of a batch.

  4. If a produce request succeeds, we set the last ack’d sequence to be the last sequence of the batch.

  5. If a produce request fails, succeeding in flight batches will also fail with an OutOfOrderSequenceException.

  6. As such, if the sequence number of a batch is not the successor of the last ack’d sequence, and if it fails with an OutOfOrderSequenceException, we consider this to be retriable.

  7. When a batch is requeued, we erase the producer id and sequence number before inserting it in the queue.

  8. When the first inflight batch fails (for whatever reason), we reset the nextSequence to be the lastAckdSequence + 1.

  9. Thus if a batch fails fatally, the sequence numbers of succeeding batches will be different on the retry. This is fine, because the previous failure was an OutOfSequence exception, which categorically means that the request was rejected.

"[...] if my producer is configured with max.in.flight.requests.per.connection = 5 and let's say that the second request got the out of order exception, what happens to all of the following requests that are already in flight? will this mean that I am for sure out of order?"

As mentioned in step 5 and 6 all succeeding in-flight requests will also fail with an retriable OutOfOrderSequenceExcpetion. As retries are greater than 0 the idempotent KafkaProducer will be able to keep the order.

这篇关于使用幂等 Kafka Producer 时的排序保证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆