自定义 KafkaConsumerInterceptor 中的注入依赖为 NULL,Spring Cloud Stream 3.0.9.RELEASE [英] Injected dependency in Customized KafkaConsumerInterceptor is NULL with Spring Cloud Stream 3.0.9.RELEASE

查看:91
本文介绍了自定义 KafkaConsumerInterceptor 中的注入依赖为 NULL,Spring Cloud Stream 3.0.9.RELEASE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在自定义的 ConsumerInterceptor 中注入一个 bean,因为在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了 ConsumerConfigCustomizer.但是,注入的 bean 始终为 NULL.

I want to inject a bean into the customized ConsumerInterceptor as ConsumerConfigCustomizer is added in Sprint Cloud Stream 3.0.9.RELEASE. However, the injected bean is always NULL.

Foo(要注入到 MyConsumerInterceptor 中的依赖项)

Foo (The dependency to be injected into MyConsumerInterceptor)

public class Foo {

    public void foo(String what) {
        System.out.println(what);
    }
}

MyConsumerInterceptor(自定义KafkaConsumerInterceptor)

MyConsumerInterceptor (Customized KafkaConsumerInterceptor)

public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private Foo foo;

    @Override
    public void configure(Map<String, ?> configs) {
        this.foo = (Foo) configs.get("fooBean");    #Comment_1
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.foo.foo("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {
    }
}

@SpringBootApplication
public static class ConfigCustomizerTestConfig {


    @Bean
    public ConsumerConfigCustomizer consumerConfigCustomizer() {
        return new ConsumerConfigCustomizer() {
            @Override
            public void configure(Map<String, Object> consumerProperties) {
                consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
                consumerProperties.put("fooBean", foo());
            }
        };
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }
}

应用程序.yml

spring:
  cloud:
    function:
      definition: consume;
    stream:
      function:
        bindings:
          consume-in-0: input
      bindings:
        input:
          destination: students
          group: groupA
          binder: kafkaBinder
      binders:
        kafkaBinder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: <IPs>

依赖

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-context</artifactId>
            <version>3.0.11.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
            <version>3.0.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-context</artifactId>
            <version>3.0.11.RELEASE</version>
        </dependency>
    </dependencies>

日志:

5:19:05.732 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
15:19:05.735 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
15:19:05.737 [main] INFO  DefaultConfiguringBeanFactoryPostProcessor   - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
15:19:05.764 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.768 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.773 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.776 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.786 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.787 [main] INFO  PostProcessorRegistrationDelegate$BeanPostProcessorChecker   - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.935 [main] INFO  TomcatWebServer   - Tomcat initialized with port(s): 8080 (http)
15:19:05.940 [main] INFO  Http11NioProtocol   - Initializing ProtocolHandler ["http-nio-8080"]
15:19:05.941 [main] INFO  StandardService   - Starting service [Tomcat]
15:19:05.941 [main] INFO  StandardEngine   - Starting Servlet engine: [Apache Tomcat/9.0.38]
15:19:05.983 [main] INFO  [/]   - Initializing Spring embedded WebApplicationContext
15:19:05.983 [main] INFO  ServletWebServerApplicationContext   - Root WebApplicationContext: initialization completed in 711 ms
15:19:06.119 [main] INFO  ThreadPoolTaskExecutor   - Initializing ExecutorService 'applicationTaskExecutor'
15:19:06.377 [main] INFO  ThreadPoolTaskScheduler   - Initializing ExecutorService 'taskScheduler'
15:19:06.385 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.402 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.403 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.418 [main] INFO  DirectWithAttributesChannel   - Channel 'application.input' has 1 subscriber(s).
15:19:06.421 [main] INFO  SimpleFunctionRegistry   - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.501 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel nullChannel
15:19:06.514 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel input
15:19:06.546 [main] INFO  IntegrationMBeanExporter   - Registering MessageChannel errorChannel
15:19:06.566 [main] INFO  IntegrationMBeanExporter   - Registering MessageHandler errorLogger
15:19:06.580 [main] INFO  EventDrivenConsumer   - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
15:19:06.580 [main] INFO  PublishSubscribeChannel   - Channel 'application.errorChannel' has 1 subscriber(s).
15:19:06.581 [main] INFO  EventDrivenConsumer   - started bean '_org.springframework.integration.errorLogger'
15:19:06.581 [main] INFO  DefaultBinderFactory   - Creating binder: kafkaBinder
15:19:06.672 [main] INFO  DefaultBinderFactory   - Caching the binder: kafkaBinder
15:19:06.672 [main] INFO  DefaultBinderFactory   - Retrieving cached binder: kafkaBinder
15:19:06.716 [main] INFO  AdminClientConfig   - AdminClientConfig values: 
    bootstrap.servers = [192.168.86.23:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

15:19:06.740 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:06.740 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.740 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302346740
15:19:06.979 [main] INFO  ConsumerConfig   - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.86.23:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupA
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

15:19:06.997 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:06.997 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.997 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302346997
15:19:07.014 [main] INFO  Metadata   - [Consumer clientId=consumer-groupA-1, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 0 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO  BinderErrorChannel   - Channel 'students.groupA.errors' has 2 subscriber(s).
15:19:07.038 [main] INFO  ConsumerConfig   - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.86.23:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupA
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

15:19:07.040 [main] INFO  AppInfoParser   - Kafka version: 2.5.1
15:19:07.040 [main] INFO  AppInfoParser   - Kafka commitId: 0efa8fb0f4c73d92
15:19:07.040 [main] INFO  AppInfoParser   - Kafka startTimeMs: 1605302347040
15:19:07.041 [main] INFO  KafkaConsumer   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Subscribed to topic(s): students
15:19:07.042 [main] INFO  ThreadPoolTaskScheduler   - Initializing ExecutorService
15:19:07.044 [main] INFO  KafkaMessageDrivenChannelAdapter   - started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6232ffdb
15:19:07.045 [main] INFO  Http11NioProtocol   - Starting ProtocolHandler ["http-nio-8080"]
15:19:07.055 [main] INFO  TomcatWebServer   - Tomcat started on port(s): 8080 (http) with context path ''
15:19:07.059 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  Metadata   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.060 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Discovered group coordinator DESKTOP-8T65SGP.lan:9092 (id: 2147483647 rack: null)
15:19:07.061 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.066 [main] INFO  Application   - Started Application in 2.093 seconds (JVM running for 2.429)
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.079 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Finished assignment for group at generation 73: {consumer-groupA-2-4e498561-d307-409b-bb51-09a57eff8b81=Assignment(partitions=[students-0])}
15:19:07.084 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  AbstractCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Successfully joined group with generation 73
15:19:07.086 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Adding newly assigned partitions: students-0
15:19:07.094 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  ConsumerCoordinator   - [Consumer clientId=consumer-groupA-2, groupId=groupA] Setting offset for partition students-0 to the committed offset FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[DESKTOP-8T65SGP.lan:9092 (id: 0 rack: null)], epoch=0}}
15:19:07.095 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  KafkaMessageChannelBinder$1   - groupA: partitions assigned: [students-0]
15:19:45.344 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO  MessagingMethodInvokerHelper   - Overriding default instance of MessageHandlerMethodFactory with provided one.
Kafka message received---> student info received

#Comment_1:这行代码执行完后,this.foo还是NULL.

#Comment_1: After this line of code is executed, this.foo is still NULL.

#Comment_2:我不认为这个配置是必要的,因为 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 是在 ConfigCustomizerTestConfig 中指定的.但是,如果从 Application.yml 中删除配置,将不会命中 Comment_1 周围的代码行

#Comment_2: I don't think this config is necessary as ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG is specified in ConfigCustomizerTestConfig. However, the line of code around Comment_1 will not be hit if the config is removed from the Application.yml

我想我的代码中缺少某些东西.任何建议将不胜感激.

I guess something is missing in my code. Any advice will be appreciated.

更新:我在日志中的消费者配置中找不到 fooBean 或 MyConsumerInterceptor.

Updates: I fail to find the fooBean or MyConsumerInterceptor in the consumer configs in logs.

推荐答案

啊 - 您正在使用用于多活页夹支持的命名活页夹;此技术仅适用于单个顶级活页夹.在这种情况下,interceptor.classes 属性为空;这就是您需要将其添加到 YAML 的原因.这是多活页夹支持的一个已知问题.如果您只使用一个活页夹,请不要命名,只需在顶层定义即可.

Ah - you are using a named binder intended for multi-binder support; this technique only works for a single top-level binder. In that case, the interceptor.classes property is empty; that's why you needed to add it to the YAML. This is a known problem with multi-binder support. If you are only using one binder, don't name it, just define it at the top level.

以下是命名活页夹的解决方法 - 覆盖活页夹工厂 bean.不幸的是,我需要复制一些私人代码,但这有效...

The following is a work around for named binders - override the binder factory bean. Unfortunately, I needed to copy some private code, but this works...

@SpringBootApplication
public class So64826496Application {

    public static void main(String[] args) {
        SpringApplication.run(So64826496Application.class, args);
    }

    public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

        private Foo foo;

        @Override
        public void configure(Map<String, ?> configs) {
            this.foo = (Foo) configs.get("fooBean");
            System.out.println(this.foo);
        }

        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            this.foo.foo("consumer interceptor");
            return records;
        }

        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

        }

        @Override
        public void close() {
        }
    }

    // BEGIN HACK TO INJECT CUSTOMIZER INTO BINDER

    @Autowired(required = false)
    private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;

    @Bean
    public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
            BindingServiceProperties bindingServiceProperties) {

        DefaultBinderFactory binderFactory = new DefaultBinderFactory(
                getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
                binderTypeRegistry) {

                    @Override
                    public synchronized <T> Binder<T, ?, ?> getBinder(String name,
                            Class<? extends T> bindingTargetType) {

                        Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
                        if (binder instanceof KafkaMessageChannelBinder) {
                            ((KafkaMessageChannelBinder) binder).setConsumerConfigCustomizer(consumerConfigCustomizer());
                        }
                        return binder;
                    }


        };
        binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
        binderFactory.setListeners(this.binderFactoryListeners);
        return binderFactory;
    }

    // Had to copy this because it's private in BindingServiceConfiguration
    private static Map<String, BinderConfiguration> getBinderConfigurations(
            BinderTypeRegistry binderTypeRegistry,
            BindingServiceProperties bindingServiceProperties) {

        Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
        Map<String, BinderProperties> declaredBinders = bindingServiceProperties
                .getBinders();
        boolean defaultCandidatesExist = false;
        Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
                .entrySet().iterator();
        while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
            defaultCandidatesExist = binderPropertiesIterator.next().getValue()
                    .isDefaultCandidate();
        }
        List<String> existingBinderConfigurations = new ArrayList<>();
        for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
                .entrySet()) {
            BinderProperties binderProperties = binderEntry.getValue();
            if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
                binderConfigurations.put(binderEntry.getKey(),
                        new BinderConfiguration(binderEntry.getKey(),
                                binderProperties.getEnvironment(),
                                binderProperties.isInheritEnvironment(),
                                binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            }
            else {
                Assert.hasText(binderProperties.getType(),
                        "No 'type' property present for custom binder "
                                + binderEntry.getKey());
                binderConfigurations.put(binderEntry.getKey(),
                        new BinderConfiguration(binderProperties.getType(),
                                binderProperties.getEnvironment(),
                                binderProperties.isInheritEnvironment(),
                                binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            }
        }
        for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
                .entrySet()) {
            if (configurationEntry.getValue().isDefaultCandidate()) {
                defaultCandidatesExist = true;
            }
        }
        if (!defaultCandidatesExist) {
            for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
                    .entrySet()) {
                if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
                    binderConfigurations.put(binderEntry.getKey(),
                            new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
                                    true, "integration".equals(binderEntry.getKey()) ? false : true));
                }
            }
        }
        return binderConfigurations;
    }

    // END HACK

    @Bean
    public ConsumerConfigCustomizer consumerConfigCustomizer() {
        return new ConsumerConfigCustomizer() {
            @Override
            public void configure(Map<String, Object> consumerProperties) {
                consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                        MyConsumerInterceptor.class.getName());
                consumerProperties.put("fooBean", foo());
            }
        };
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    @Bean
    Consumer<String> consume() {
        return System.out::println;
    }

}

class Foo {

    void foo(String in) {
        System.out.println(in);
    }

}

还有:

$ kafka-console-producer --bootstrap-server localhost:9092 --topic consume-in-0
>foo

结果:

consumer interceptor
foo

这篇关于自定义 KafkaConsumerInterceptor 中的注入依赖为 NULL,Spring Cloud Stream 3.0.9.RELEASE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆