Kafka Producer:使用回调处理异步发送中的异常 [英] Kafka Producer : Handle Exception in Async Send with Callback

查看:82
本文介绍了Kafka Producer:使用回调处理异步发送中的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在异步发送到 Kafka 的情况下捕获异常.Kafka 生产者 Api 带有一个函数 send(ProducerRecord record, Callback callback).但是当我针对以下两种情况对此进行测试时:

  • Kafka Broker 关闭
  • 主题未预先创建回调没有被调用.相反,我在代码中收到了发送失败的警告(如下所示).

问题:

  • 那么回调是否只针对特定异常调用?

  • Kafka 客户端何时在异步发送时尝试连接到 Kafka 代理:在每次批量发送时还是定期发送?

因此,当出现问题并且异步发送不成功时,它将最终失败并导致未来失败或/和回调异常.如果您不是以事务方式运行它,则仍然可能意味着批处理中的某些消息已找到到达代理的方式,而其他消息则没有.如果每条发送到 Kafka 的消息都需要对上游系统(如 http 摄取接口等)进行阻塞式确认,那肯定会是一个问题.做到这一点的唯一方法是使用未来的 get 阻止每条消息,如

总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题.绝对可以更好地记录它.

还有一件事,因为你提到了 linger.ms:

<块引用>

请注意,到达时间相近的记录通常会即使与 linger.ms=0 一起批处理,因此在重负载下批处理将无论延迟配置如何,都会发生

I need to catch the exceptions in case of Async send to Kafka. The Kafka producer Api comes with a fuction send(ProducerRecord record, Callback callback). But when I tested this against following two scenarios :

  • Kafka Broker Down
  • Topic not pre created The callbacks are not getting called. Rather I am getting warning in the code for unsuccessful send (as shown below).

Questions :

  • So are the callbacks called only for specific exceptions ?

  • When does Kafka Client try to connect to Kafka broker while async send : on every batch send or periodically ?

Kafka Warning Image

Note : I am also using linger.ms setting of 25 sec to batch send my records.


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }

解决方案

You will get those warning for non-existing topic as a resilience mechanism provided with KafkaProducer. If you wait a bit longer(should be 60 seconds by default), the callback will be called eventually: Here's my snippet:

So, when something goes wrong and async send is not successful, it will eventually fail with a failed future or/and a callback with exception. If you are not running it transactionally, it can still mean that some messages from the batch have found their way to the broker, while others haven't. It will most certainly be a problem if you need a blocking-style acknowledgement to the upstream system(like http ingestion interface, etc.) per every message that is sent to Kafka. The only way to do that is by blocking every message with the future's get, as described in the documentation:

In general, I've noticed a lot of question related to KafkaProducer delivery semantics and guarantees. It can definitely be documented better.

One more thing, since you mentioned linger.ms:

Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration

这篇关于Kafka Producer:使用回调处理异步发送中的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆