Kafka生产者回调异常 [英] Kafka producer callback Exception

查看:106
本文介绍了Kafka生产者回调异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我们生成消息时,我们可以定义一个回调,这个回调可能会出现异常:

kafkaProducer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {如果(e == null){//好的} 别的 {//不好}}});

考虑到生产者内置的重试逻辑,不知道开发者应该明确处理哪种异常?

解决方案

根据Callback Java Docs 回调期间可能发生以下异常:

<块引用>

处理这条记录时抛出的异常.如果没有发生错误,则为空.可能抛出的异常包括:

<块引用>

不可重试异常(致命,永远不会发送消息):

  • 无效主题异常
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • 未知服务器异常
<块引用>

可重试的异常(暂时的,可以通过增加#.retries 来覆盖):

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • 超时异常
  • UnknownTopicOrPartitionException

也许这是一个不令人满意的答案,但最终哪些异常以及如何处理它们完全取决于您的用例和业务需求.

处理生产者重试

但是,作为开发者,您还需要处理Kafka Producer本身的重试机制.重试主要由以下因素驱动:

<块引用>

重试:设置一个大于零将导致客户端重新发送任何发送失败并可能出现暂时性错误的记录.请注意,此重试与客户端在收到错误后重新发送记录的情况没有什么不同.允许重试而不将 max.in.flight.requests.per.connection(默认值:5)设置为 1 可能会改变记录的顺序 因为如果将两个批次发送到单个分区,而第一个失败并重试但第二次成功,则第二批中的记录可能首先出现.另外请注意,如果由 delivery.timeout.ms 配置的超时在成功确认之前首先到期,则生产请求将在重试次数用完之前失败.用户通常应该不设置此配置,而是使用 delivery.timeout.ms 来控制重试行为.

<块引用>

retry.backoff.ms:尝试重试对给定主题分区的失败请求之前等待的时间.这避免了在某些失败情况下以紧密循环的方式重复发送请求.

<块引用>

request.timeout.ms:配置控制客户端等待请求响应的最长时间.如果在超时过去之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败.这应该大于replica.lag.time.max.ms(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性.

建议保留上述三个配置的默认值,而专注于

定义的硬性上限时间<块引用>

delivery.timeout.ms:调用 send() 返回后报告成功或失败的时间上限.这限制了记录在发送之前延迟的总时间、等待代理确认的时间(如果预期)以及允许重试发送失败的时间.如果遇到不可恢复的错误,重试次数已用尽,或者记录被添加到达到较早交付截止期限的批次中,生产者可能会报告未能在此配置之前发送记录.此配置的值应大于或等于 request.timeout.mslinger.ms 之和.

When we produce messages we can define a callback, this callback can expect an exception:

kafkaProducer.send(producerRecord, new Callback() {
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
      // OK
    } else {
      // NOT OK
    }
  }
});

Considered the buitl-in retry logic in the producer, I wonder which kind of exception should developers deal explicitly with?

解决方案

According to the Callback Java Docs there are the following Exception possible happening during callback:

The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:

Non-Retriable exceptions (fatal, the message will never be sent):

  • InvalidTopicException
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • UnknownServerException

Retriable exceptions (transient, may be covered by increasing #.retries):

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • TimeoutException
  • UnknownTopicOrPartitionException

Maybe this is a unsatisfactory answer, but in the end which Exceptions and how to handle them completely relies on your use case and business requirements.

Handling Producer Retries

However, as a developer you also need to deal with the retry mechanism itself of the Kafka Producer. The retries are mainly driven by:

retries: Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection (default: 5) to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

retry.backoff.ms: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

The recommendation is to keep the default values of those three configurations above and rather focus on the hard upper time limit defined by

delivery.timeout.ms: An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed for retriable send failures. The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.

这篇关于Kafka生产者回调异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆