seekToEnd 所有分区并在 Kafka 消费者的自动重新平衡中幸存下来 [英] seekToEnd of all partitions and survive automatic rebalancing of Kafka consumers

查看:39
本文介绍了seekToEnd 所有分区并在 Kafka 消费者的自动重新平衡中幸存下来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想寻找所有分区的末尾,即使在代理端存储了偏移量.如果更多额外的消费者连接到同一个消费者组,他们应该获取最新存储的偏移量.我正在执行以下操作:

When a Kafka consumer of consumer group A connects to the Kafka broker I would like to seek to the end of all partitions, even if an offset is stored on the broker side. If more additional consumers are connecting for the same consumer group they should pickup the latest stored offsets. I am doing the following:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

问题是,当我连接消费者组 A 的第一个消费者 c1 时,一切正常,如果我连接消费者组 A 的额外消费者 c2,则该组正在重新平衡,并且 c1 将消耗跳过的偏移量.

The problem is when I connect the first consumer c1 of consumer group A everything works as expected, if I connect an additional consumer c2 of consumer group A, the group is rebalancing and c1 will consume the skipped offsets.

有什么想法吗?

推荐答案

你可以创建一个实现ConsumerRebalanceListener的类,如下图:

You could create a class which implements ConsumerRebalanceListener, as shown below:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

然后在订阅主题时使用这个监听器:

Then use this listener when subscribing the topics:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));

这篇关于seekToEnd 所有分区并在 Kafka 消费者的自动重新平衡中幸存下来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆