多个主题的 Kafka 消费者 [英] Kafka consumer for multiple topic

查看:86
本文介绍了多个主题的 Kafka 消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主题列表(现在是 10 个),它们的大小将来会增加.我知道我们可以产生多个线程(每个主题)来从每个主题消费,但在我的情况下,如果主题数量增加,那么从主题消费的线程数量会增加,这是我不想要的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态.

I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.

有没有办法让一个消费者从所有主题中消费?如果是,那么我们如何才能实现它?Kafka 将如何维护偏移量?请提出答案.

Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.

推荐答案

我们可以使用以下 API 订阅多个主题:consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

We can subscribe for multiple topic using following API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

Consumer 拥有主题信息,我们可以通过创建 OffsetAndMetadata 对象使用 consumer.commitAsync 或 consumer.commitSync() 提交,如下所示.

Consumer has the topic info and we can comit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}

这篇关于多个主题的 Kafka 消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆