自定义 KafkaConsumerInterceptor 中的注入依赖为 NULL,Spring Cloud Stream 3.0.9.RELEASE [英] Injected dependency in Customized KafkaConsumerInterceptor is NULL with Spring Cloud Stream 3.0.9.RELEASE
问题描述
我想在自定义的 ConsumerInterceptor 中注入一个 bean,因为在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了 ConsumerConfigCustomizer.但是,注入的 bean 始终为 NULL.
I want to inject a bean into the customized ConsumerInterceptor as ConsumerConfigCustomizer is added in Sprint Cloud Stream 3.0.9.RELEASE. However, the injected bean is always NULL.
Foo(要注入到 MyConsumerInterceptor 中的依赖项)
Foo (The dependency to be injected into MyConsumerInterceptor)
public class Foo {
public void foo(String what) {
System.out.println(what);
}
}
MyConsumerInterceptor(自定义KafkaConsumerInterceptor)
MyConsumerInterceptor (Customized KafkaConsumerInterceptor)
public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("fooBean"); #Comment_1
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.foo.foo("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
@SpringBootApplication
public static class ConfigCustomizerTestConfig {
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return new ConsumerConfigCustomizer() {
@Override
public void configure(Map<String, Object> consumerProperties) {
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("fooBean", foo());
}
};
}
@Bean
public Foo foo() {
return new Foo();
}
}
应用程序.yml
spring:
cloud:
function:
definition: consume;
stream:
function:
bindings:
consume-in-0: input
bindings:
input:
destination: students
group: groupA
binder: kafkaBinder
binders:
kafkaBinder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <IPs>
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.0.11.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>3.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.0.11.RELEASE</version>
</dependency>
</dependencies>
日志:
5:19:05.732 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
15:19:05.735 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
15:19:05.737 [main] INFO DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
15:19:05.764 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.768 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.773 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.776 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.786 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.787 [main] INFO PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
15:19:05.935 [main] INFO TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
15:19:05.940 [main] INFO Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
15:19:05.941 [main] INFO StandardService - Starting service [Tomcat]
15:19:05.941 [main] INFO StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.38]
15:19:05.983 [main] INFO [/] - Initializing Spring embedded WebApplicationContext
15:19:05.983 [main] INFO ServletWebServerApplicationContext - Root WebApplicationContext: initialization completed in 711 ms
15:19:06.119 [main] INFO ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
15:19:06.377 [main] INFO ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
15:19:06.385 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.402 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.403 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.418 [main] INFO DirectWithAttributesChannel - Channel 'application.input' has 1 subscriber(s).
15:19:06.421 [main] INFO SimpleFunctionRegistry - Looking up function 'consume' with acceptedOutputTypes: []
15:19:06.501 [main] INFO IntegrationMBeanExporter - Registering MessageChannel nullChannel
15:19:06.514 [main] INFO IntegrationMBeanExporter - Registering MessageChannel input
15:19:06.546 [main] INFO IntegrationMBeanExporter - Registering MessageChannel errorChannel
15:19:06.566 [main] INFO IntegrationMBeanExporter - Registering MessageHandler errorLogger
15:19:06.580 [main] INFO EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
15:19:06.580 [main] INFO PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
15:19:06.581 [main] INFO EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
15:19:06.581 [main] INFO DefaultBinderFactory - Creating binder: kafkaBinder
15:19:06.672 [main] INFO DefaultBinderFactory - Caching the binder: kafkaBinder
15:19:06.672 [main] INFO DefaultBinderFactory - Retrieving cached binder: kafkaBinder
15:19:06.716 [main] INFO AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [192.168.86.23:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
15:19:06.740 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:06.740 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.740 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302346740
15:19:06.979 [main] INFO ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [192.168.86.23:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupA
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
15:19:06.997 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:06.997 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:06.997 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302346997
15:19:07.014 [main] INFO Metadata - [Consumer clientId=consumer-groupA-1, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 0 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 1 subscriber(s).
15:19:07.031 [main] INFO BinderErrorChannel - Channel 'students.groupA.errors' has 2 subscriber(s).
15:19:07.038 [main] INFO ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [192.168.86.23:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupA
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
15:19:07.040 [main] INFO AppInfoParser - Kafka version: 2.5.1
15:19:07.040 [main] INFO AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
15:19:07.040 [main] INFO AppInfoParser - Kafka startTimeMs: 1605302347040
15:19:07.041 [main] INFO KafkaConsumer - [Consumer clientId=consumer-groupA-2, groupId=groupA] Subscribed to topic(s): students
15:19:07.042 [main] INFO ThreadPoolTaskScheduler - Initializing ExecutorService
15:19:07.044 [main] INFO KafkaMessageDrivenChannelAdapter - started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6232ffdb
15:19:07.045 [main] INFO Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
15:19:07.055 [main] INFO TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
15:19:07.059 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO Metadata - [Consumer clientId=consumer-groupA-2, groupId=groupA] Cluster ID: WmLQLxSaRrqxST80G6w-5w
15:19:07.060 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Discovered group coordinator DESKTOP-8T65SGP.lan:9092 (id: 2147483647 rack: null)
15:19:07.061 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.066 [main] INFO Application - Started Application in 2.093 seconds (JVM running for 2.429)
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
15:19:07.074 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] (Re-)joining group
15:19:07.079 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Finished assignment for group at generation 73: {consumer-groupA-2-4e498561-d307-409b-bb51-09a57eff8b81=Assignment(partitions=[students-0])}
15:19:07.084 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO AbstractCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Successfully joined group with generation 73
15:19:07.086 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Adding newly assigned partitions: students-0
15:19:07.094 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-groupA-2, groupId=groupA] Setting offset for partition students-0 to the committed offset FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[DESKTOP-8T65SGP.lan:9092 (id: 0 rack: null)], epoch=0}}
15:19:07.095 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO KafkaMessageChannelBinder$1 - groupA: partitions assigned: [students-0]
15:19:45.344 [KafkaConsumerDestination{consumerDestinationName='students', partitions=1, dlqName='null'}.container-0-C-1] INFO MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
Kafka message received---> student info received
#Comment_1:这行代码执行完后,this.foo还是NULL.
#Comment_1: After this line of code is executed, this.foo is still NULL.
#Comment_2:我不认为这个配置是必要的,因为 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 是在 ConfigCustomizerTestConfig 中指定的.但是,如果从 Application.yml 中删除配置,将不会命中 Comment_1 周围的代码行
#Comment_2: I don't think this config is necessary as ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG is specified in ConfigCustomizerTestConfig. However, the line of code around Comment_1 will not be hit if the config is removed from the Application.yml
我想我的代码中缺少某些东西.任何建议将不胜感激.
I guess something is missing in my code. Any advice will be appreciated.
更新:我在日志中的消费者配置中找不到 fooBean 或 MyConsumerInterceptor.
Updates: I fail to find the fooBean or MyConsumerInterceptor in the consumer configs in logs.
推荐答案
啊 - 您正在使用用于多活页夹支持的命名活页夹;此技术仅适用于单个顶级活页夹.在这种情况下,interceptor.classes
属性为空;这就是您需要将其添加到 YAML 的原因.这是多活页夹支持的一个已知问题.如果您只使用一个活页夹,请不要命名,只需在顶层定义即可.
Ah - you are using a named binder intended for multi-binder support; this technique only works for a single top-level binder. In that case, the interceptor.classes
property is empty; that's why you needed to add it to the YAML. This is a known problem with multi-binder support. If you are only using one binder, don't name it, just define it at the top level.
以下是命名活页夹的解决方法 - 覆盖活页夹工厂 bean.不幸的是,我需要复制一些私人代码,但这有效...
The following is a work around for named binders - override the binder factory bean. Unfortunately, I needed to copy some private code, but this works...
@SpringBootApplication
public class So64826496Application {
public static void main(String[] args) {
SpringApplication.run(So64826496Application.class, args);
}
public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("fooBean");
System.out.println(this.foo);
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.foo.foo("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
// BEGIN HACK TO INJECT CUSTOMIZER INTO BINDER
@Autowired(required = false)
private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;
@Bean
public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
DefaultBinderFactory binderFactory = new DefaultBinderFactory(
getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
binderTypeRegistry) {
@Override
public synchronized <T> Binder<T, ?, ?> getBinder(String name,
Class<? extends T> bindingTargetType) {
Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
if (binder instanceof KafkaMessageChannelBinder) {
((KafkaMessageChannelBinder) binder).setConsumerConfigCustomizer(consumerConfigCustomizer());
}
return binder;
}
};
binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
binderFactory.setListeners(this.binderFactoryListeners);
return binderFactory;
}
// Had to copy this because it's private in BindingServiceConfiguration
private static Map<String, BinderConfiguration> getBinderConfigurations(
BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
Map<String, BinderProperties> declaredBinders = bindingServiceProperties
.getBinders();
boolean defaultCandidatesExist = false;
Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
.entrySet().iterator();
while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
defaultCandidatesExist = binderPropertiesIterator.next().getValue()
.isDefaultCandidate();
}
List<String> existingBinderConfigurations = new ArrayList<>();
for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
.entrySet()) {
BinderProperties binderProperties = binderEntry.getValue();
if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
else {
Assert.hasText(binderProperties.getType(),
"No 'type' property present for custom binder "
+ binderEntry.getKey());
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderProperties.getType(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
}
for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
.entrySet()) {
if (configurationEntry.getValue().isDefaultCandidate()) {
defaultCandidatesExist = true;
}
}
if (!defaultCandidatesExist) {
for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
.entrySet()) {
if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
true, "integration".equals(binderEntry.getKey()) ? false : true));
}
}
}
return binderConfigurations;
}
// END HACK
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return new ConsumerConfigCustomizer() {
@Override
public void configure(Map<String, Object> consumerProperties) {
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MyConsumerInterceptor.class.getName());
consumerProperties.put("fooBean", foo());
}
};
}
@Bean
public Foo foo() {
return new Foo();
}
@Bean
Consumer<String> consume() {
return System.out::println;
}
}
class Foo {
void foo(String in) {
System.out.println(in);
}
}
还有:
$ kafka-console-producer --bootstrap-server localhost:9092 --topic consume-in-0
>foo
结果:
consumer interceptor
foo
这篇关于自定义 KafkaConsumerInterceptor 中的注入依赖为 NULL,Spring Cloud Stream 3.0.9.RELEASE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!