KafkaBindingRebalanceListener Bean 不是由 KafkaMessageChannelBinder Bean 自动装配的 [英] KafkaBindingRebalanceListener Bean not autowired by KafkaMessageChannelBinder Bean

查看:47
本文介绍了KafkaBindingRebalanceListener Bean 不是由 KafkaMessageChannelBinder Bean 自动装配的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

文档非常直接,它建议公开 KafkaBindingRebalanceListener 类型的 Bean,并且将在内部调用 onPartitiosnAssigned 方法.我正在尝试做同样的事情,并且在 spring 框架创建其 KafkaMessageChannelBinder Bean 时,ObjectProvider.getIfUnique() 总是返回 null,因为它无法找到所需的 bean.似乎当应用程序启动 SpringFramework 策略时首先创建其 Bean,并且无法找到 Rebalance Listener Bean,因为它尚未创建.以下是来自项目的三个代码片段.如果我在使用 Spring Framework 之前缺少任何指示应用程序在应用程序包中创建 Beans 的内容,请帮忙.

Documentation is pretty straight forward which suggests exposing a Bean of type KafkaBindingRebalanceListener and onPartitiosnAssigned method would be called internally. I'm trying to do the same and somehow while spring framework creates its KafkaMessageChannelBinder Bean the ObjectProvider.getIfUnique() always return null as it not able to find the required bean. It seems when application starts SpringFramework strats creating its Beans first and isnt able to find the Rebalance Listener Bean as it is not yet created. Following are the three code snippets from project. Please help if im missing anything to instruct application to create Beans in application package first before going to Spring Framework.

RebalanceListener

package io.spring.dataflow.sample.seekoffset.config;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
    Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);

    @Override
    public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
        logger.debug("onPartitionsAssigned");
    }
}

配置类

package io.spring.dataflow.sample.seekoffset.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@EnableBinding(Sink.class)
public class SeekOffsetConfig {
    Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message) {
        logger.debug("receiveMessage()");
    }
}

应用程序类

package io.spring.dataflow.sample.seekoffset;

import io.spring.dataflow.sample.seekoffset.config.KafkaRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
public class SeekOffsetApplication {
    Logger logger = LoggerFactory.getLogger(SeekOffsetApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(SeekOffsetApplication.class, args);
    }
}

推荐答案

您使用的是什么版本?对于 Boot 2.3.2 和 Hoxton.SR6,这对我来说很好用:

What version are you using? This works fine for me with Boot 2.3.2 and Hoxton.SR6:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {

    public static void main(String[] args) {
        SpringApplication.run(So63157778Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    KafkaBindingRebalanceListener rebal() {
        return new KafkaBindingRebalanceListener() {

            @Override
            public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
                    Collection<TopicPartition> partitions, boolean initial) {
                System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
            }

        };
    }

}

input assignments: [input-0], initial call :true

这对我也有用:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So63157778Application {

    public static void main(String[] args) {
        SpringApplication.run(So63157778Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

}

@Component
class Foo implements KafkaBindingRebalanceListener {

    @Override
    public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
            Collection<TopicPartition> partitions, boolean initial) {
        System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
    }

}

这篇关于KafkaBindingRebalanceListener Bean 不是由 KafkaMessageChannelBinder Bean 自动装配的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆