Spring Kafka异步发送调用块 [英] Spring Kafka asynchronous send calls block

查看:460
本文介绍了Spring Kafka异步发送调用块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring-Kafka版本1.2.1,并且当Kafka服务器关闭/无法访问时,异步发送调用会阻塞一段时间.似乎是TCP超时.代码是这样的:

I'm using Spring-Kafka version 1.2.1 and, when the Kafka server is down/unreachable, the asynchronous send calls block for a time. It seems to be the TCP timeout. The code is something like this:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

我快速看了一下Spring-Kafka代码,它似乎只是将任务传递给kafka客户端库,将回调交互转换为将来的对象交互.查看kafka客户端库,代码变得更加复杂,我没有花时间去理解所有内容,但是我想它可能是在同一线程中进行远程调用(至少是元数据).

I've taken a really quick look at the Spring-Kafka code and it seems to just pass the task along to the kafka client library, translating a callback interaction to a future object interaction. Looking at the kafka client library, the code gets more complex and I didn't take the time to understand it all, but I guess it may be making remote calls (metadata, at least?) in the same thread.

作为用户,我希望即使将来无法访问远程kafka服务器,返回将来的Spring-Kafka方法也会立即返回.

As a user, I expected the Spring-Kafka methods that return a future to return immediately, even if the remote kafka server is unreachable.

欢迎确认我的理解是错误的还是错误.我现在暂时结束了对它的异步操作.

Any confirmation if my understanding is wrong or if this is a bug would be welcome. I ended up making it asynchronous on my end for now.

另一个问题是,Spring-Kafka文档在开始时就说它提供了同步和异步发送方法.我找不到任何不返回期货的方法,也许文档需要更新.

Another problem is that Spring-Kafka documentation says, at the beginning, that it provides synchronous and asynchronous send methods. I couldn't find any methods that do not return futures, maybe the documentation needs updating.

如果需要,我很乐意提供更多详细信息.谢谢.

I'm happy to provide any further details if needed. Thanks.

推荐答案

除了配置类上的@EnableAsync注释外,在调用此代码时,还需要在方法上使用@Async注释.

In addition to the @EnableAsync annotation on a configuration class, the @Async annotation needs to be used on the method were you invoke this code.

http://www.baeldung.com/spring-async

这里有一些代码问题. Kafka生产者配置:

Here some code fragements. Kafka producer config:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

制作人本身:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

这篇关于Spring Kafka异步发送调用块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆