如何验证 sprng kafka 生产者是否已成功发送消息? [英] How to verify sprng kafka producer has successfully sent message or not?

查看:251
本文介绍了如何验证 sprng kafka 生产者是否已成功发送消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试测试 spring kafka(2.2.5.RELEASE),当生产者使用 kafkatemplate 发送消息时,我想知道该消息是否成功发送.基于此,我想更新该消息 ID 的数据库记录.处理这种情况的最佳做法是什么?

I am trying to test spring kafka(2.2.5.RELEASE) where when producer send message with kafkatemplate, i like to know if that message was sent successfully or not. Based on that I would like to update the db record for that message id. What is the best practice to handle this scenario?

这是检查成功或失败的示例代码

Here is the sample code which checks success or failure

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
    SendResult<String, String> result = null;
    try {
        result = future.get(10000, TimeUnit.MILLISECONDS);
        LOGGER.info("Send Result : {}", result);
        LOGGER.info("Saving entry in db");
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    } catch (Exception e) {
        LOGGER.error("Error publishing message ", e);
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    }

推荐答案

send() 操作返回一个异步完成的 ListenableFuture<>.

The send() operations return a ListenableFuture<> which is completed asynchronously.

如果你想阻塞调用线程来获取结果,使用

If you want to block the calling thread to get the result, use

future.get(timeout, TimeUnit.MILLISECONDS);

这篇关于如何验证 sprng kafka 生产者是否已成功发送消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆