spring-kafka - 如何从头开始阅读一个主题,而从最后阅读另一个主题? [英] spring-kafka - how to read one topic from the beginning, while reading another one from the end?

查看:15
本文介绍了spring-kafka - 如何从头开始阅读一个主题,而从最后阅读另一个主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 spring-kafka 应用程序,其中我需要阅读 2 个主题:test1 和 test2:

I'm writing a spring-kafka app, in which I need to read 2 topics: test1 and test2:

public class Receiver {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Receiver.class);

    @KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
  @TopicPartition(topic = "test2", partitions = { "0" })})
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

我的配置如下:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        return props;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

我只需要能够读取来自test1"的最新消息,同时能够读取test2"开头的所有消息.我只对应用启动时的test2"消息感兴趣,但只要应用正在运行,就需要不断读取test1"消息.

I need to be able to read only the latest messages from "test1", while being able to read all messages from the very beginning of "test2". I'm only interested in "test2" messages upon my app startup, but the "test1" messages need to be read continuously as long as the app is running.

有没有办法配置这样的功能?

Is there a way to configure such functionality?

推荐答案

这是一种对我有用的方法:

Here is a way, which worked for me:

@KafkaListener(id = "receiver-api",         
        topicPartitions =
        { @TopicPartition(topic = "schema.topic", 
                partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
                @TopicPartition(topic = "data.topic", partitions = { "0" })})
    public void receiveMessage(String message) {
        try {
                JSONObject incomingJsonObject = new JSONObject(message); 
                if(!incomingJsonObject.isNull("data")){
                    handleSchemaMessage(incomingJsonObject);
                }

                else {
                    handleDataMessage(incomingJsonObject);
                }

        } catch (Exception e) {
            e.printStackTrace();
        }

使用partitionOffsets"注解(import org.springframework.kafka.annotation.PartitionOffset;)

Using "partitionOffsets" annotation (import org.springframework.kafka.annotation.PartitionOffset;)

是能够始终从头阅读特定主题的关键,同时像往常一样拖尾"其他主题.

was the key to being able always read a specific topic from the beginning, while "tailing" other topic as usual.

这篇关于spring-kafka - 如何从头开始阅读一个主题,而从最后阅读另一个主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆