Kafka春季对支持多个消费者的要求 [英] Spring Kafka Requirements for Supporting Multiple Consumers

查看:24
本文介绍了Kafka春季对支持多个消费者的要求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如人们期望的那样,它的共同点是希望让不同的消费者以不同的方式反序列化Kafka中的主题.spring boot autoconfig存在一个已知问题.似乎一旦定义了其他工厂,Spring Kafka或autoconfig就会抱怨不再能够找到合适的消费工厂.有人指出,一种解决方案是在配置中包含类型为(对象,对象)的ConsumerFactory.但是,没有人显示此源代码,也没有澄清是否需要以任何特定方式命名.或者,如果只是简单地将此使用方添加到配置中,则无需关闭自动配置.所有这些都还不清楚.

As one would expect its common to want to have different Consumers deserializing in different ways off topics in Kafka. There is a known problem with spring boot autoconfig. It seems that as soon as other factories are defined Spring Kafka or the autoconfig complains about not being able to find a suitable consumer factory anymore. Some have pointed out that one solution is to include a ConsumerFactory of type (Object, Object) in the config. But no one has shown the source code for this or clarified if it needs to be named in any particular way. Or if simply adding this Consumer to the config removes the need to turn off autoconfig. All that remains very unclear.

如果您不熟悉此问题,请阅读 https://github.com/spring-projects/spring-boot/issues/19221

If you are not familiar with this issue please read https://github.com/spring-projects/spring-boot/issues/19221

在没有问题的地方,定义ConsumerFactory并将其添加到配置中的某个位置.有人可以更精确一点吗?

Where it was just stated ok, define the ConsumerFactory and add it somewhere in your config. Can someone be a bit more precise about this please.

  1. 确切显示如何定义ConsumerFactory,以使Spring boot autoconfig不会出错.
  2. 请问是否需要关闭自动配置?
  3. 说明是否需要以任何特殊方式命名消费工厂".

推荐答案

最简单的解决方案是坚持使用Boot的自动配置,并在 @KafkaListener 本身上覆盖反序列化器...

The simplest solution is to stick with Boot's auto-configuration and override the deserializer on the @KafkaListener itself...

@SpringBootApplication
public class So63108344Application {

    public static void main(String[] args) {
        SpringApplication.run(So63108344Application.class, args);
    }

    @KafkaListener(id = "so63108344-1", topics = "so63108344-1")
    public void listen1(String in) {
        System.out.println(in);
    }

    @KafkaListener(id = "so63108344-2", topics = "so63108344-2", properties =
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                "=org.apache.kafka.common.serialization.ByteArrayDeserializer")
    public void listen2(byte[] in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so63108344-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so63108344-2").partitions(1).replicas(1).build();
    }

}

要进行更高级的容器自定义(或者如果您不想污染 @KafkaListener ,则可以使用 ContainerCustomizer ...

For more advanced container customization (or if you don't want to pollute the @KafkaListener, you can use a ContainerCustomizer...

@Component
class Customizer {
    
    public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.setContainerCustomizer(container -> {
            if (container.getGroupId().equals("so63108344-2")) {
                container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
                container.getContainerProperties().getKafkaConsumerProperties()
                    .setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            }
        });
    }
    
}

这篇关于Kafka春季对支持多个消费者的要求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆