编写customConsumerFactory和customKafkaListenerContainerFactory时不会自动加载Spring Kafka属性 [英] spring kafka properties not auto loaded when writing customConsumerFactory and customKafkaListenerContainerFactory
问题描述
我想从application.properties加载spring-kafka属性,并且必须使用spring自动配置加载.我的问题是由以下原因引起的:java.lang.IllegalStateException:没有可用的确认作为参数,侦听器容器必须具有MANUAL AckMode才能填充确认,但是我已经在属性文件spring.kafka.listener.ack-mode =中进行了设置但是,由于此属性是我的自定义fooKafkaListenerContainerFactory,因此在此属性中需要手动立即执行.无法选择此设置.我想要的是没有手动设置的情况,应该从我的application.properies中拾取它.@Gary Russell感谢您的帮助.
I want to load my spring-kafka properties from application.properties and that must be loaded using spring auto configuration. My problem is Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment however I have already set it in properties file spring.kafka.listener.ack-mode=manual-immediate in this properties however because it's my custom fooKafkaListenerContainerFactory It's not able to pick this settings. What I want is without setting it manually it should be picked up from my application.properies. @Gary Russell your help is appreciated.
我的代码如下所示
package com.foo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.foo.FooKafkaDTO;
@Configuration
public class KafkaConsumerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@Bean
public ConsumerFactory<String, FooKafkaDTO> fooConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(FooKafkaDTO.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FooKafkaDTO> fooKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<String, FooKafkaDTO> fooConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, FooKafkaDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(fooConsumerFactory());
return factory;
}
}
Here are my properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable.auto.commit=false
spring.kafka.consumer.key-deserialize=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserialize=org.springframework.kafka.support.serializer.JsonDeserializer
Here is my listener
@Service
public class Consumer {
private static final Log LOG = LogFactory.getLog(Consumer.class);
@KafkaListener(
topicPartitions = {@TopicPartition(topic = "outbox.foo",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
groupId = "group_id",
containerFactory = "fooKafkaListenerContainerFactory")
public void consume(@Payload FooKafkaDTO fooKafkaDTO, Acknowledgment acknowledgment,
@Headers MessageHeaders headers) {
LOG.info("offset:::" + Long.valueOf(headers.get(KafkaHeaders.OFFSET).toString()));
LOG.info(String.format("$$ -> Consumed Message -> %s", fooKafkaDTO));
acknowledgment.acknowledge();
}
}
推荐答案
After going through the documentation of spring-kafka spring-kafka-official-documentation! I could find this code which replaced the whole boilerplate code. I have simplified my KafkaConsumerConfig class and it looks like below now.
package com.foo
import java.util.Map;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.foo.FooKafkaDTO;
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultKafkaConsumerFactory fooDTOConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory(props,
new JsonDeserializer<>(String.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(FooKafkaDTO.class)
.ignoreTypeHeaders());
}
}
这篇关于编写customConsumerFactory和customKafkaListenerContainerFactory时不会自动加载Spring Kafka属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!