Apache的卡夫卡 - KafkaStream在题目/分区 [英] Apache Kafka - KafkaStream on topic/partition

查看:656
本文介绍了Apache的卡夫卡 - KafkaStream在题目/分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写卡夫卡消费者对大容量高速度的分布式应用程序。我只有一个主题,但评级收到的消息是非常高的。有了这样的服务更多的消费者将是适合这个用例的多个分区。消费最好的办法是有多个流的读者。根据文档或提供样品,KafkaStreams数量的ConsumerConnector给出了基于话题的数量。想知道如何得到一个以上的KafkaStream读者[根据分区],这样我就可以跨越一个线程每个流或从同一KafkaStream在多线程读取将从多个分区做并发读取?

I am writing Kafka Consumer for high volume high velocity distributed application. I have only one topic but rate incoming messages is very high. Having multiple partition that serve more consumer would be appropriate for this use-case. Best way to consume is to have multiple stream readers. As per the documentation or available samples, number of KafkaStreams the ConsumerConnector gives out is based on number of topics. Wondering how to get more than one KafkaStream readers [based on the partition], so that I can span one thread per stream or Reading from same KafkaStream in multiple threads would do the concurrent read from multiple partitions?

任何见解是多少AP preciated。

Any insights are much appreciated.

推荐答案

想和大家分享我从邮件列表中找到:

Would like to share what I found from mailing list:

您在主题地图控件的主题有多少流分为传递的数量。在你的情况,如果在1擦肩而过,所有10个分区的数据将被送入1流。如果您在2传,每个2流将从5个分区中获取数据。如果你在11传球,其中10个将分别从1分区和1个流得到的数据将什么也得不到。

The number that you pass in the topic map controls how many streams a topic is divided into. In your case, if you pass in 1, all 10 partitions's data will be fed into 1 stream. If you pass in 2, each of the 2 streams will get data from 5 partitions. If you pass in 11, 10 of them will each get data from 1 partition and 1 stream will get nothing.

通常情况下,你需要在自己的线程来遍历每个流。这是因为每个数据流可以永远方框,如果没有新的事件

Typically, you need to iterate each stream in its own thread. This is because each stream can block forever if there is no new event.

示例代码段:

topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);

for (final KafkaStream stream : streams) {
    ReadTask task = new ReadTask(stream, msgTopic);
    task.addObserver(this.msgObserver);
    tasks.add(task); executor.submit(task);
}

参考:<一href=\"http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E\" rel=\"nofollow\">http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E

这篇关于Apache的卡夫卡 - KafkaStream在题目/分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆