如何使用Kafka主题中特定偏移量到特定偏移量的数据? [英] How to consume data from kafka topic in specific offset to specific offset?

查看:314
本文介绍了如何使用Kafka主题中特定偏移量到特定偏移量的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要消耗特定的偏移量到特定的结束偏移量!! Consumer.seek()从特定的偏移量读取数据,但是我需要从offset到tooffset检索数据!任何帮助将不胜感激,在此先感谢.

I need to consume specific offset to specific end offset!! consumer.seek() reads the data from specific offset but I need retrieve the data fromoffset to tooffset !! Any help will be appreciate , thanks in advance.

    ConsumerRecords<String, String> records = consumer.poll(100);
    if(flag) {
        consumer.seek(new TopicPartition("topic-1", 0), 90);
        flag = false;
    }

推荐答案

要从开始偏移量到结束偏移量读取消息,首先需要使用seek()将使用者移动到所需的起始位置,然后再将,直到达到所需的终点偏移为止.

To read messages from a start offset to an end offset, you first need to use seek() to move the consumer at the desired starting location and then poll() until you hit the desired end offset.

例如,要从偏移量100消耗到200:

For example, to consume from offset 100 to 200:

String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Move to the desired start offset 
            consumer.seek(tp, 100L);
        }
    });
    boolean run = true;
    long lastOffset = 200L;
    while (run) {
        ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord<String, String> record : crs) {
            System.out.println(record);
            if (record.offset() == lastOffset) {
                // Reached the end offsey, stop consuming
                run = false;
                break;
            }
        }
    }
}

这篇关于如何使用Kafka主题中特定偏移量到特定偏移量的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆