Spring Kafka 支持多个消费者的要求 [英] Spring Kafka Requirements for Supporting Multiple Consumers

查看:44
本文介绍了Spring Kafka 支持多个消费者的要求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如人们所期望的那样,希望不同的消费者以不同的方式在 Kafka 中以不同的方式反序列化主题.spring boot autoconfig存在一个已知问题.似乎一旦定义了其他工厂,Spring Kafka 或自动配置就会抱怨无法再找到合适的消费者工厂.有人指出,一种解决方案是在配置中包含类型为 (Object, Object) 的 ConsumerFactory.但是没有人展示过它的源代码,也没有人说明它是否需要以任何特定的方式命名.或者,如果简单地将此消费者添加到配置中,则无需关闭自动配置.所有这些都还很不清楚.

As one would expect its common to want to have different Consumers deserializing in different ways off topics in Kafka. There is a known problem with spring boot autoconfig. It seems that as soon as other factories are defined Spring Kafka or the autoconfig complains about not being able to find a suitable consumer factory anymore. Some have pointed out that one solution is to include a ConsumerFactory of type (Object, Object) in the config. But no one has shown the source code for this or clarified if it needs to be named in any particular way. Or if simply adding this Consumer to the config removes the need to turn off autoconfig. All that remains very unclear.

如果你不熟悉这个问题,请阅读https://github.com/spring-projects/spring-boot/issues/19221

If you are not familiar with this issue please read https://github.com/spring-projects/spring-boot/issues/19221

在刚刚声明好的地方,定义 ConsumerFactory 并将其添加到您的配置中的某个位置.有人可以更精确一点吗.

Where it was just stated ok, define the ConsumerFactory and add it somewhere in your config. Can someone be a bit more precise about this please.

  1. 准确展示如何定义 ConsumerFactory 以便 Spring boot autoconfig 不会抱怨.
  2. 解释是否需要关闭自动配置?
  3. 解释消费者工厂是否需要以任何特殊方式命名.

推荐答案

最简单的解决方案是坚持使用 Boot 的自动配置并覆盖 @KafkaListener 本身上的反序列化器...

The simplest solution is to stick with Boot's auto-configuration and override the deserializer on the @KafkaListener itself...

@SpringBootApplication
public class So63108344Application {

    public static void main(String[] args) {
        SpringApplication.run(So63108344Application.class, args);
    }

    @KafkaListener(id = "so63108344-1", topics = "so63108344-1")
    public void listen1(String in) {
        System.out.println(in);
    }

    @KafkaListener(id = "so63108344-2", topics = "so63108344-2", properties =
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                "=org.apache.kafka.common.serialization.ByteArrayDeserializer")
    public void listen2(byte[] in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so63108344-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so63108344-2").partitions(1).replicas(1).build();
    }

}

对于更高级的容器定制(或者如果你不想污染@KafkaListener,你可以使用一个ContainerCustomizer...

For more advanced container customization (or if you don't want to pollute the @KafkaListener, you can use a ContainerCustomizer...

@Component
class Customizer {
    
    public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.setContainerCustomizer(container -> {
            if (container.getGroupId().equals("so63108344-2")) {
                container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
                container.getContainerProperties().getKafkaConsumerProperties()
                    .setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            }
        });
    }
    
}

这篇关于Spring Kafka 支持多个消费者的要求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆