获取发送给kafka主题的最后一条消息 [英] Getting the last message sent to a kafka topic

查看:343
本文介绍了获取发送给kafka主题的最后一条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Kafka的新手,正在研究将专有流服务连接到Kafka的原型.

I'm new to Kafka and working on a prototype to connect a proprietary streaming service into Kafka.

我希望获取有关该主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的ID进行登录.

I'm looking to get the key of the last message sent on a topic as our in-house stream consumer needs to logon with the ID of the last message it received when connecting.

是否可以使用KafkaProducer或KafkaConsumer来做到这一点?

Is it possible, using either the KafkaProducer or a KafkaConsumer to do this?

我尝试使用使用者执行以下操作,但是当同时运行控制台使用者时,我会看到消息重播.

I've attempted to do the following using a Consumer, but when also running the console consumer I see messages replayed.

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

这是预期的行为还是我走错了路?

Is this expected behaviour or am I on the wrong path?

推荐答案

问题出在final long offset = consumer.committed(partition).offset()行上,就像

The problem is on line final long offset = consumer.committed(partition).offset(), as link api refers committed method is to get the last committed offset for the given partition, i.e: the last offset your consumer tell kafka server that it had already read. So, definitely you will got messages replayed, because you always read from specific offset. As I think I only have to remove the first for block.

这篇关于获取发送给kafka主题的最后一条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆