如何有效地将@KafkaListener 绑定到 ConcurrentKafkaListenerContainerFactory? [英] How can I effectively bind my @KafkaListener to ConcurrentKafkaListenerContainerFactory?

查看:258
本文介绍了如何有效地将@KafkaListener 绑定到 ConcurrentKafkaListenerContainerFactory?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了这个对我来说很奇怪的场景:

I hit this scenario which appears strange to me:

所以基本上我在一个类中定义了两个 @KafkaListener:

So basically I have defined two @KafkaListener in one class:

@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
    public void receive(){}

@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(){}

它们的idtopicscontainerFactory是不同的,每个都依赖不同的ConcurrentKafkaListenerContainerFactory作为在另一个类中定义:

Their id, topics, containerFactory are different, and each of them relies on a different ConcurrentKafkaListenerContainerFactory as defined in another class:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group1", "earliest"));
    factory.setAutoStartup(false);
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group2", "latest"));
    factory.setAutoStartup(true);
    return factory;
}

@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
    Map<String, Object> config = new HashMap<>();
    // dt is current timestamp in millisecond (epoch)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
    // other config omitted
    return new DefaultKafkaConsumerFactory<>(config);
}

所以我期望看到的(以及我想要实现的)是:

So what I expect to see (and what I want to achieve) are:

  1. 只有 listener2 会自动启动,因为factory.setAutoStartup(true)
  2. Listener2 将以 group.id "group2" 和 auto.offset.reset "latest" 开头
  3. 稍后当 listener1 通过某个事件侦听器启动时,它将启动使用 group.id "group1" 和 auto.offset.reset "earlist"
  1. Only listener2 will auto-start because factory.setAutoStartup(true)
  2. Listener2 will start with group.id "group2" and auto.offset.reset "latest"
  3. Later when listener1 starts via some event listener, it will start with group.id "group1" and auto.offset.reset "earlist"

然而,实际上只有第一个得到保证.Listener2 可以以 {group2 + latest} 或 {group1 + early} 开头.然后当listener1开始消费数据时,它只会重用listener2的配置(我可以看到在我的日志中打印了两次包含时间戳的相同组ID)

However, only the 1st is actually guaranteed. Listener2 can start with either {group2 + latest} or {group1 + earliest}. And later when listener1 starts to consume data, it will just reuse the config of listener2 (I can see the same group id which contains a timestamp is printed twice in my log)

我的问题是,为什么 listener2 的组 ID 和偏移配置是随机选择的,而 autoStartup 不是?为什么listener1会重用listener2的配置?

My question is, why the group ID and offset config for listener2 are randomly picked while autoStartup is not? And why listener1 will reuse the config of listener2?

推荐答案

这是因为 consumerFactory 是一个单例 @Bean 并且参数在第二次调用时被忽略.

It's because consumerFactory is a singleton @Bean and the arguments are ignored on the second call.

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 添加到工厂中,每次都会得到一个新的 bean.

Add @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) to the factory get a new bean each time.

但是,您不需要任何这些,您可以简单地在注释上设置 groupId 属性并避免所有这些额外的定义.

However, you don't need any of this, you can simply set the groupId property on the annotations and avoid all this extra definition.

您还可以控制注释上的 autoStartup(自 2.2 起).

You can also control autoStartup on the annotation (since 2.2).

编辑

要回答下面评论中的问题...

To answer the question in the comment below...

groupId = "#{'${group.id}' + T(java.time.Instant).now().toEpochMilli()}"

但是,如果您想要一个唯一的组 ID;这更可靠...

however, if you want a unique group id; this is more reliable...

groupId = "#{'${group.id}' + T(java.util.UUID).randomUUID()}"

这篇关于如何有效地将@KafkaListener 绑定到 ConcurrentKafkaListenerContainerFactory?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆