如何在java中一一从Kafka Consumer获取消息? [英] How to get messages from Kafka Consumer one by one in java?

查看:63
本文介绍了如何在java中一一从Kafka Consumer获取消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Apache Kafka API 并尝试一次只获取一条消息.我只写一个主题.我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息.我在文本框中输入一个字符串,然后单击发送".我可以发送任意数量的消息.假设我发送了 3 条消息,而我的 3 条消息是嗨",哈哈"再见".还有一个接收"按钮.现在,使用 TutorialsPoint,当我单击接收按钮时,我会同时在控制台上打印所有 3 条消息(大声笑,再见).但是,当我单击接收"时,我只想一次打印一条消息.在用户界面上.例如,我第一次点击接收按钮时,它会打印嗨",第二次是大声笑",第三次是再见".我是 Kafka 的新手,对如何做到这一点感到困惑.我尝试从代码中删除两个循环,所以它只有

I'm using Apache Kafka API and trying to get only one message at a time. I'm only writing to one topic. I can send and receive messages by having a pop UI screen with a textbox. I input a string in the textbox and click "send." I can send as many messages as I want. Let's say I send 3 messages and my 3 messages were "" "lol," "bye." There is also a "receive" button. Right now, using the traditional code found in TutorialsPoint, I get all 3 messages ( lol, bye) at once printed on the console when I click on the receive button. However, I want only want one message to be printed at a time when I click "receive" on the UI. For example, the first time I hit the receive button, it would print "" the second time would be "lol," and the third time would be "bye." I am new to Kafka and am confused on how to do this. I tried removing both the loops from the code so it just has

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有那两行代码,当我第一次点击接收按钮时,它会打印hi";但是我第二次按下它时,收到消息尝试心跳失败,因为组正在重新平衡 kafka."当我设置 max.poll.records = 1 时也出现错误,因为我希望我的所有消息最终,但是当按下接收按钮时,只需要将其中一个消息记录到控制台.下一次,未记录主题中的下一条消息将被记录.

If I just have those 2 lines of code, the first time I hit the receive button, it would print "hi" but the second time I press it, get the message "attempt to heartbeat failed since group is rebalancing kafka." I'm getting errors when I set the max.poll.records = 1 too as I want all my messages eventually but just one of them needs to be logged to the console when the receive button is pressed. The next time, the next message in the topic not logged would be logged.

希望这是有道理的!感谢任何帮助!提前致谢!

Hope that makes sense! Appreciate any help! Thanks in advance!

包含队列后的新代码,因此我们可以在发送和接收消息之间交替每当有新消息时更新队列:

New code after including queues and also so we can alternate between sending and receiving messages & update the queue whenever there is a new message:

        if (payloadQueue.isEmpty()){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            if (records.isEmpty()) {
                log.info("No More Records to Add");
                consumer.close();
            }
            else {
                records.iterator().forEachRemaining(record -> {
                    log.info("RECORD: " + record);
                    payloadQueue.offer(record);
                });
                payload = payloadQueue.poll().value();
                log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
            }
        }
        else {
            payload = payloadQueue.poll().value();
            log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
        }

推荐答案

Kafka使用batch get提高性能,真的没必要设置max.poll.records=1.
您可以通过一些变通方法轻松实现您想要的.

Kafka use batch get to improve performance, it's really not necessary to set max.poll.records=1.
What you want can be easily achieved with some workaround.

你可以有一个Queue来存储消息,每次按下接收按钮,你从队列中轮询一条消息,如果队列为空,你调用consumer.poll 填充队列.

You can have a Queue to store the message, each time the receive button is pressed, you poll one message from the queue, if the queue is empty, you call consumer.poll to fill the queue.

    private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    public void buttonPressed(){
        if (queue.isEmpty()){
            consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
        }else {
            System.out.println(queue.poll());
        }
    }

这篇关于如何在java中一一从Kafka Consumer获取消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆