所有分区的seekToEnd并在Kafka消费者自动重新平衡中生存下来 [英] seekToEnd of all partitions and survive automatic rebalancing of Kafka consumers

查看:654
本文介绍了所有分区的seekToEnd并在Kafka消费者自动重新平衡中生存下来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当消费者组A的Kafka消费者连接到Kafka经纪人时,即使经纪人端存储了偏移量,我也想搜索到所有分区的末尾.如果更多的其他使用者正在为同一使用者组进行连接,则他们应该获取最新存储的偏移量. 我正在执行以下操作:

When a Kafka consumer of consumer group A connects to the Kafka broker I would like to seek to the end of all partitions, even if an offset is stored on the broker side. If more additional consumers are connecting for the same consumer group they should pickup the latest stored offsets. I am doing the following:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

问题是,当我连接消费者组A的第一个消费者c1时,一切正常,如果我连接消费者组A的另一个消费者c2,则该组正在重新平衡,并且c1将消耗跳过的偏移量.

The problem is when I connect the first consumer c1 of consumer group A everything works as expected, if I connect an additional consumer c2 of consumer group A, the group is rebalancing and c1 will consume the skipped offsets.

有什么想法吗?

推荐答案

您可以创建实现ConsumerRebalanceListener的类,如下所示:

You could create a class which implements ConsumerRebalanceListener, as shown below:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

然后在订阅主题时使用此侦听器:

Then use this listener when subscribing the topics:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));

这篇关于所有分区的seekToEnd并在Kafka消费者自动重新平衡中生存下来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆