Kafka:每个poll()调用一次只能消耗一个主题吗? [英] Kafka: Have each poll() call only consume from one topic at a time?

查看:65
本文介绍了Kafka:每个poll()调用一次只能消耗一个主题吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka使用者,它从多个kafka主题中消费.我希望能够通过每100条消息进行1次I/O调用来批量写入目的地,但是要进行批量处理,所有消息都必须来自同一主题.

I have a single Kafka consumer that consumes from multiple kafka topics. I'd like to be able to use a batch write into my destination via 1 i/o call per 100 messages, but in order to batch, all the messages need to come from the same topic.

如果我有多个主题(比如说5),并且当Consumer.poll或consumer.consume发生时,例如每次轮询我收到100条消息,是否有办法确保所有消息都来自同一主题,以便可以将这些消息批量写入同一目的地?这样下一个.poll调用会得到下一个主题?

If I have multiple topics (let's say 5), and when consumer.poll or consumer.consume happens, and I get for example 100 messages each poll, is there a way to ensure that those are all from the same topic, so that those messages can be batch written to the same destination? Such that the next .poll call gets the next topic?

推荐答案

无法按主题轮询-您已订阅主题列表,并且每个主题可能具有多个分区.给定的轮询将获取 ConsumerRecords 对象,该对象是 ConsumerRecord 的容器. ConsumerRecord 表示一个KV对,它属于您已订阅的主题之一的分区之一.

It is not possible to poll per topic - you are subscribed to a list of topics and each topics may have multiple partitions. A given poll fetches a ConsumerRecords object, which is a container of ConsumerRecord. A ConsumerRecord represents a KV pair which belongs to one of the partitions of one of the topics which you have subscribed.

Kafka尝试将 TopicPartition 分配给基于分配器形成单个组的使用者.如果您只有一个使用者,它将要求所有主题的所有分区.然后,没有什么可以阻止您在应用程序代码中进行分组

Kafka tries to assign a TopicPartition to consumers forming a single group based on an assignor. If you have only one consumer, it will claim all of the partitions of all of the topics. Then there is nothing that stops you from grouping within your application code though

例如

private void consume() {
    List<String> topics = List.of("topic1", "topic2", "topic3", "topic4", "topic5");
    kafkaConsumer.subscribe(topics);

    while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

        topics.forEach(s -> {
            List<ConsumerRecord<String, String>> recordsPerTopicPartition = new ArrayList<>();
            consumerRecords.records(s).forEach(recordsPerTopicPartition::add);
            doWhatever(recordsPerTopicPartition);
        });
    }
}

private void doWhatever(List<ConsumerRecord<String, String>> consumerRecords) {
    //process
}

这篇关于Kafka:每个poll()调用一次只能消耗一个主题吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆