Kafka 轮询无记录的正确方法 [英] Kafka Proper Way to Poll No Records

查看:23
本文介绍了Kafka 轮询无记录的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡.我已将轮询间隔设置得很长,但我不想一直增加它以进行越来越长的处理.

for keeping my consumer alive (very long variable length processing) I'm implementing a empty poll() call in a background thread that will keep the broker from rebalancing if I spend too much time between polls(). I have set my poll-interval to be very long, but I don't want to just keep increasing it forever for longer and longer processing.

轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找在 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可以正确读取它们.

What's the proper way to poll for no records? Currently I'm calling poll(), then re-seeking back to the earliest offsets for each partition returned in the poll call() so they can be read properly by the main thread once it's done processing the previous messages.

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);

推荐答案

处理长处理时间(并避免消费者重新平衡)的正确方法是使用 KafkaConsumer.pause()/KafkaConsumer.resume() 方法.您可以在此处阅读更多相关信息:

The proper way to handle long processing time (and avoiding consumer rebalance) is to use KafkaConsumer.pause() / KafkaConsumer.resume() methods. You can read more about it here:

这篇关于Kafka 轮询无记录的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆