Kafka - 如何在 Producer 类中获取失败的消息详细信息 [英] Kafka - How to obtain failed messages details in Producer class

查看:20
本文介绍了Kafka - 如何在 Producer 类中获取失败的消息详细信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Kafka 允许通过 Producer (KafkaProducer) 类上的以下方法异步发送消息:

Kafka allows for asynchronous message sending through below methods on Producer (KafkaProducer) class:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record) 
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

成功可以通过
1) Future 对象或
2) 回调调用的 onCompletion 方法.onCompletion 的完整方法签名和用法如下(取自 kafka 文档)

Successes can be handled through
1) the Future<RecordMetaData> object or
2) onCompletion method invoked by the callback. Full method signature and usage of onCompletion is as below (taken from kafka docs)

`

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);  
producer.send(record,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

虽然失败需要通过传递给 onCompletion 方法的 Exception e 处理

While failure needs to be handled through the Exception e passed to the onCompletion method

很好,到目前为止,一切看起来都不错.

Fine every thing looks good so far.

但是,如果我说得对,可以从 exceptione 对象获得的任何合理信息都是堆栈跟踪和异常消息.我在这里要指出的是,e 不包含实际发送记录的任何信息.或者换句话说,它不包含对发送给 kafka 代理的实际 record 的引用.那么如果 record 没有成功发送,生产者可以进行哪些有用的处理或处理.真的不多.为什么我这么说 - 理想情况下,我想在某个地方记录失败的消息,然后尝试重新发送它.但是由于框架提供的信息很少(e),我觉得这是不可能的.

But if I am getting it right, any reasonable information that can be obtained from exception or e object is stacktrace and exception message. What I mean to point out here is, e does not contain any information of the actual record sent. Or in other words, it does not contain a reference to the actual record that was sent to kafka broker. So what useful processing or handling can be done by the producer if the record was not sent successfully. Really not much. Why I say this is - ideally I would like to make a log of the failed message some where and then try to resend it. But with the little information (e) provided by framework, i feel this is not possible.

有人能指出我是对还是错吗?

Can someone point out if I am right or wrong?

推荐答案

您可以轻松创建接收 producerRecord 作为构造函数参数的回调.因此,在 onCompletion 出现异常时,您可以完全了解生产者记录,甚至可以尝试再次发送.

You could easily create a callback that receives the producerRecord as a constructor argument. So upon onCompletion with an exception, you can have complete knowledge of the producer record, and even try to send it again.

我处理了同样的问题.创建了一个获取 producerRecord 的回调,以及一个使用执行程序服务再次发送记录的回调处理程序.因此,最终,我可以容忍任意数量的故障(例如网络问题或 kafka 停机),并从中恢复.

I dealt with the same issue. Created a callback that gets both producerRecord, and a callback handler that uses an executor service to send the record again. So eventually, I can tolerate any number of failures (e.g. network issues or kafka is down), and recover from it.

这篇关于Kafka - 如何在 Producer 类中获取失败的消息详细信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆