卡夫卡消费者的多个主题 [英] Kafka consumer for multiple topic

查看:425
本文介绍了卡夫卡消费者的多个主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主题列表(目前为10个),其规模将来可能会增加.我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是在我的情况下,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是会太频繁地获取数据,因此线程将处于理想状态.

I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.

有没有办法让单个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案.

Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.

推荐答案

我们可以使用以下API订阅多个主题: Consumer.subscribe(Arrays.asList(topic1,topic2),ConsumerRebalanceListener obj)

We can subscribe for multiple topic using following API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

消费者具有主题信息,我们可以通过如下创建OffsetAndMetadata对象来使用consumer.commitAsync或consumer.commitSync().

Consumer has the topic info and we can comit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}

这篇关于卡夫卡消费者的多个主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆