有没有办法从Kafka主题获取最新消息? [英] Is there a way to get the last message from Kafka topic?

查看:60
本文介绍了有没有办法从Kafka主题获取最新消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有多个分区的Kafka主题,我想知道Java中是否有一种方法可以获取该主题的最后一条消息.我不在乎我只是想获取最新消息的分区.

I have a Kafka topic with multiple partitions and I wonder if there is a way in Java to fetch the last message for the topic. I don't care for the partitions I just want to get the latest message.

我尝试了 @KafkaListener ,但是只有在主题更新时,它才会获取消息.如果打开应用程序后没有发布任何内容,则不会返回任何内容.

I have tried @KafkaListener but it fetches the message only when the topic is updated. If there is nothing published after the application is opened nothing is returned.

也许听众根本不是解决问题的正确方法?

Maybe the listener is not the right approach to the problem at all?

推荐答案

以下代码段对我有用.您可以尝试一下.注释中的解释.

This following snippet worked for me. You may try this. Explanation in the comments.

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        consumer.poll(Duration.ofSeconds(10));

        consumer.assignment().forEach(System.out::println);

        AtomicLong maxTimestamp = new AtomicLong();
        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

        // get the last offsets for each partition
        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
            System.out.println("offset: "+offset);

            // seek to the last offset of each partition
            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);

            // poll to get the last record in each partition
            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {

                // the latest record in the 'topic' is the one with the highest timestamp
                if (record.timestamp() > maxTimestamp.get()) {
                    maxTimestamp.set(record.timestamp());
                    latestRecord.set(record);
                }
            });
        });
        System.out.println(latestRecord.get());

这篇关于有没有办法从Kafka主题获取最新消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆