每个Http请求有多个Kafka生产者实例 [英] Multiple Kafka Producer Instance for each Http Request

查看:17
本文介绍了每个Http请求有多个Kafka生产者实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。

如何高效地为每个HTTP请求创建新的Kafka Producer实例?

//Kafka Transaction enabled
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" );

@Service
public class ProducerService {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    public void postMessage(final MyUser message) {
        // wrapping the send method in a transaction
        this.kafkaTemplate.executeInTransaction(kafkaTemplate -> {
              kafkaTemplate.send("custom", null, message);
        }

    }

推荐答案

请参阅DefaultKafkaProducerFactory的javadoc。它为生产者发起的事务维护生产者缓存。

/**
 * The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
 * <p>
 * This implementation will return the same {@link Producer} instance (if transactions are
 * not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
 * implementations on each {@link #createProducer()} invocation.

...

 * Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
 * cache of producers is maintained; closing a producer returns it to the cache. The
 * producers are closed and the cache is cleared when the factory is destroyed, the
 * application context stopped, or the {@link #reset()} method is called.

...

 */

这篇关于每个Http请求有多个Kafka生产者实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆