事务生产者与仅幂等生产者 Java(异常 OutOfOrderSequenceException) [英] Transactional Producer vs Just Idempotent Producer Java (Exception OutOfOrderSequenceException)

查看:84
本文介绍了事务生产者与仅幂等生产者 Java(异常 OutOfOrderSequenceException)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用具有幂等生产者配置的 spring-kafka:

这些是我的配置道具:

<块引用>

 Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));

//配置SSL加密的以下三个设置props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProps.getJksPassword());props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

我的 kafka 生产者抛出 OutOfOrderSequenceException:

<块引用>

2019-03-06 21:25:47 Sender [ERROR] [Producer clientId=producer-1] Broker 返回 org.apache.kafka.common.errors.OutOfOrderSequenceException:Broker 收到了一个乱序的序列号topic-partition topic-1 在偏移量 -1 处.这表明代理上的数据丢失,应进行调查.2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId=producer-1] ProducerId 设置为 -1,纪元为 -12019-03-06 21:25:47 ProducerKafka [ERROR] 我们在发送到 kafka 时遇到错误,请重试作业

我不确定为什么会抛出这个异常.我找不到具体的答案.异常的官方 javadoc 声明如下:

<块引用>

这个异常表示broker从生产者那里收到了一个意外的序列号,这意味着数据可能已经丢失了.如果生产者仅配置为幂等性(即,如果设置了 enable.idempotence 并且没有配置 transactional.id),则可以继续使用相同的生产者实例发送,但这样做有重新排序已发送记录的风险.对于事务性生产者,这是一个致命错误,您应该关闭生产者.

这是否意味着我需要使用事务性生产者来避免这个问题?

KafkaProducer 文档说明了使上述声明模棱两可的内容:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

<块引用>

要启用幂等性,必须将 enable.idempotence 配置设置为 true.如果设置,重试配置将默认为 Integer.MAX_VALUE,max.in.flight.requests.per.connection 配置将默认为 1,acks 配置将默认为 all.幂等生产者没有 API 更改,因此无需修改现有应用程序即可利用此功能.

为了利用幂等生产者,必须避免应用程序级别的重新发送,因为这些无法重复数据删除.因此,如果应用程序启用幂等性,建议不要设置重试配置,因为它将默认为 Integer.MAX_VALUE.另外,如果一个 send(ProducerRecord) 即使无限重试也返回错误(例如,如果消息在发送之前在缓冲区中过期),那么建议关闭生产者并检查最后产生的消息的内容以确保它不是重复的.最后,生产者只能保证单个会话内发送的消息的幂等性.

上述声明清楚地表明,对于幂等生产者,我所需要的只是使用 enable.idempotence 属性.但是,异常指出我必须使用该 transactional.id 属性.

创建幂等异步生产者而不必处理致命的OutOfOrderSequenceException 的正确方法是什么.

解决方案

如果你明确设置了重试,那么你必须设置

max.in.flight.requests.per.connection=1

为了避免乱序问题

该文档非常清楚地说明:

设置一个大于零的值将导致客户端重新发送任何发送失败并可能出现暂时性错误的记录.请注意,此重试与客户端在收到错误后重新发送记录的情况没有什么不同.允许重试而不将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一个失败并重试但第二个成功,则第二个批次中的记录可能首先出现.

I use spring-kafka with idempotent producer configuration:

these are my configuration props:

   Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));

    //configure the following three settings for SSL Encryption
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  appProps.getJksPassword());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
    props.put(ProducerConfig.RETRIES_CONFIG, 5);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

My kafka producer throws OutOfOrderSequenceException:

2019-03-06 21:25:47 Sender [ERROR] [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number for topic-partition topic-1 at offset -1. This indicates data loss on the broker, and should be investigated. 2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId=producer-1] ProducerId set to -1 with epoch -1 2019-03-06 21:25:47 ProducerKafka [ERROR] we encountered error while sending to kafka, please retry the job

I am not sure why this exception is being thrown. I couldn't find a concrete answer to this. The official javadoc for the exception states the following:

This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.

Does that mean I need to use a transactional producer to avoid this issue?

KafkaProducer doc states something that makes the above statement ambiguous: https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will be defaulted to Integer.MAX_VALUE, the max.in.flight.requests.per.connection config will be defaulted to 1, and acks config will be defaulted to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

The above statement clearly states, all I need for an idempotent producer is to just use enable.idempotence property. However, the exception states that I have to use that transactional.id property.

What is the right way to create an idempotent async producer without having to deal with the fatal OutOfOrderSequenceException.

解决方案

if you explicitly set the retries, then you must set

max.in.flight.requests.per.connection=1

in order to avoid outoforder issue

the doc states very clearly that:

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

这篇关于事务生产者与仅幂等生产者 Java(异常 OutOfOrderSequenceException)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆