如何在Java中一一从Kafka Consumer中获取消息? [英] How to get messages from Kafka Consumer one by one in java?

查看:512
本文介绍了如何在Java中一一从Kafka Consumer中获取消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Apache Kafka API并尝试一次仅获取一条消息.我只写一个话题.我可以通过带有文本框的弹出UI屏幕来发送和接收消息.我在文本框中输入一个字符串,然后单击发送".我可以发送任意多的邮件.假设我发送了3条消息,而我的3条消息为嗨", 大声笑" 再见."还存在接收"消息.按钮.现在,使用 TutorialsPoint ,当我单击接收按钮时,我立即获得所有3条消息(大声笑,再见).但是,当我单击接收"时,我只希望一次打印一条消息.在用户界面上.例如,当我第一次点击接收按钮时,它将打印"hi".第二次是大声笑",并且第三次将是再见".我是Kafka的新手,对如何执行此操作感到困惑.我尝试从代码中删除这两个循环,所以它只是

I'm using Apache Kafka API and trying to get only one message at a time. I'm only writing to one topic. I can send and receive messages by having a pop UI screen with a textbox. I input a string in the textbox and click "send." I can send as many messages as I want. Let's say I send 3 messages and my 3 messages were "" "lol," "bye." There is also a "receive" button. Right now, using the traditional code found in TutorialsPoint, I get all 3 messages ( lol, bye) at once printed on the console when I click on the receive button. However, I want only want one message to be printed at a time when I click "receive" on the UI. For example, the first time I hit the receive button, it would print "" the second time would be "lol," and the third time would be "bye." I am new to Kafka and am confused on how to do this. I tried removing both the loops from the code so it just has

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有这两行代码,则在我第一次点击接收按钮时,它将打印"hi"字样.但是我第二次按下该消息时,会收到消息由于组正在重新平衡kafka,因此尝试心跳失败."设置max.poll.records = 1时也出现错误,因为我希望最终所有消息都出现,但是当按下接收按钮时,只需要将其中一条记录到控制台即可.下次,将记录主题中未记录的下一条消息.

If I just have those 2 lines of code, the first time I hit the receive button, it would print "hi" but the second time I press it, get the message "attempt to heartbeat failed since group is rebalancing kafka." I'm getting errors when I set the max.poll.records = 1 too as I want all my messages eventually but just one of them needs to be logged to the console when the receive button is pressed. The next time, the next message in the topic not logged would be logged.

希望如此! 感谢任何帮助! 预先感谢!

Hope that makes sense! Appreciate any help! Thanks in advance!

包括队列后的新代码,因此我们可以在发送和接收消息之间进行切换每当有新消息时更新队列:

New code after including queues and also so we can alternate between sending and receiving messages & update the queue whenever there is a new message:

        if (payloadQueue.isEmpty()){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            if (records.isEmpty()) {
                log.info("No More Records to Add");
                consumer.close();
            }
            else {
                records.iterator().forEachRemaining(record -> {
                    log.info("RECORD: " + record);
                    payloadQueue.offer(record);
                });
                payload = payloadQueue.poll().value();
                log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
            }
        }
        else {
            payload = payloadQueue.poll().value();
            log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
        }

推荐答案

Kafka使用批处理get来提高性能,实际上没有必要设置max.poll.records = 1.
您可以通过一些解决方法轻松实现所需的目标.

Kafka use batch get to improve performance, it's really not necessary to set max.poll.records=1.
What you want can be easily achieved with some workaround.

您可以有一个Queue来存储消息,每按一次接收按钮,就会从队列中轮询一条消息,如果队列为空,则调用consumer.poll来填充队列.

You can have a Queue to store the message, each time the receive button is pressed, you poll one message from the queue, if the queue is empty, you call consumer.poll to fill the queue.

    private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    public void buttonPressed(){
        if (queue.isEmpty()){
            consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
        }else {
            System.out.println(queue.poll());
        }
    }

这篇关于如何在Java中一一从Kafka Consumer中获取消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆