Spring @KafkaListener和并发 [英] Spring @KafkaListener and concurrency

查看:25
本文介绍了Spring @KafkaListener和并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spring boot + spring @KafkaListener.我期望的行为是:我的kafka侦听器读取10个线程中的消息.这样,如果其中一个线程挂起,则其他消息将继续读取和处理消息.

我定义的

  @Beanpublic ConcurrentKafkaListenerContainerFactory<?,?>kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer配置器,ConsumerFactory<对象,对象>kafkaConsumerFactory){ConcurrentKafkaListenerContainerFactory<对象,对象>factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory,kafkaConsumerFactory);factory.getContainerProperties().setMissingTopicsFatal(false);factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);返回工厂;} 

和spring boot配置:

  spring.kafka.listener.concurrency = 10 

我看到所有配置都能正常工作,我在jmx中看到了10个线程:

但是我进行了这样的测试:

  @KafkaListener(topics = {"$ {topic.name}"},clientIdPrefix ="$ {kafka.client.id.prefix}",idIsGroup = false,id ="$ {kafka.listener.name}",containerFactory ="kafkaListenerContainerFactory")公共无效的监听(ConsumerRecord< String,String>记录){if(record.getVersion()< 3){尝试 {线程.睡眠(20000);} catch(InterruptedException e){e.printStackTrace();}}别的System.out.println(有效!");} 

如果版本是<3,然后挂,否则-工作.我发送了1,2和3版本的3条消息.我希望版本1和2的消息会挂起,但是版本3将在侦听器时处理.但是不幸的是,版本3的消息在开始处理之前会等待消息1和2.

也许我的期望不是真的,这是kafka监听器的正确行为.请帮助我处理kafka并发,为什么会这样呢?

解决方案

Kafka不能那样工作;您至少需要与使用者一样多的分区(由spring容器中的 concurrency 控制).

此外,一次(组中)只有一个使用者(一次使用)可以从一个分区中进行使用,因此,即使您增加了分区,在卡住"的使用者之后的同一分区中的记录也不会被其他使用者接收./p>

I am working with spring boot + spring @KafkaListener. And the behavior I expect is: my kafka listener reads messages in 10 threads. So that, if one of threads hangs, other messages are would continue reading and handling messages.

I defined bean of

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
{

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;
}

And spring boot config:

spring.kafka.listener.concurrency=10

I see that all configs work, I see my 10 threads in jmx:

But then I make such test:

 @KafkaListener(topics = {
            "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    {
        if(record.getVersion() < 3) {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
            System.out.println("It works!");

    }

If version is < 3, then hang, otherwise - work. I send 3 messages with version 1,2 and 3. I expect that messages with version 1 and 2 will hang, but version 3 will be processed at the time it comes to listener. But unfortunately message with version 3 waits for messages 1 and 2 before starts its processing.

Maybe my expectations are not true and this is a right behavior of kafka listener. Please help me to deal with kafka concurrency, why does it act like that?

解决方案

Kafka doesn't work that way; you need at least as many partitions as consumers (controlled by concurrency in the spring container).

Also, only one consumer (in a group) can consume from a partition at a time so, even if you increase the partitions, records in the same partition behind the "stuck" consumer will not be received by other consumers.

这篇关于Spring @KafkaListener和并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆