Kafka使用者 - 消费者进程和线程与主题分区的关系是什么 [英] Kafka consumer - what's the relation of consumer processes and threads with topic partitions
问题描述
我最近一直与卡夫卡合作,对消费者群体下的消费者有点混淆。混淆的中心是将消费者实现为流程还是线程。对于这个问题,假设我使用的是高级消费者。
让我们考虑一下我已经尝试过的场景。在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。我创建了一个消费者( ConsumerConnector
)进程 consumer1
,其中组 group1
,然后创建一个大小为2的主题计数图,然后在该过程下生成2个消费者线程 consumer1_thread1
和 consumer1_thread2
。看起来 consumer1_thread1
正在消耗分区 0
而 consumer1_thread2
是消耗分区 1
。这种行为总是确定的吗?以下是代码段。 Class TestConsumer
是我的消费者线程类。
...
Map< String,Integer> topicCountMap = new HashMap< String,Integer>();
topicCountMap.put(topic,new Integer(2));
Map< String,List< KafkaStream< byte [],byte []>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List< KafkaStream< byte [],byte []>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for(final KafkaStream stream:streams){
executor.submit(new TestConsumer(stream,threadNumber));
threadNumber ++;
}
...
现在,让我们考虑另一种情况(我没有尝试,但我很好奇)我开始2个消费者流程 consumer1
和 consumer2
都有相同的群组 group1
,每个都是单线程进程。现在我的问题是:
-
两个独立的消费者流程(在同一个群体下)如何与分区相关联这个案例 ?它与上述单进程多线程场景有何不同?
-
一般来说,消费者线程或流程如何映射/与主题中的分区相关?
-
Kafka文档确实说消费者群体下的每个消费者都会消耗一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?
-
这里有关于实施的细微之处吗?消费者作为流程与线程?在此先感谢。
消费者群组可以运行多个消费者实例(具有相同 group-id
的多个进程)。消耗时,每个分区仅由组中的一个消费者实例使用。
例如。如果您的主题包含2个分区,并且您启动了具有2个使用者实例的消费者组 group-A
,则每个消费者实例中的每个将使用来自该主题的特定分区的消息。 / p>
如果您使用不同的组ID启动相同的2个消费者 group-A
& group-B
然后来自该主题的两个分区的消息将被广播到它们中的每一个。因此,在这种情况下,在 group-A
下运行的消费者实例将具有来自该主题的两个分区的消息,对于 group-B也是如此。
。
在文档
编辑:根据您的评论说明,
我想知道在同一个流程下拥有2个消费者线程而不是2个消费者流程之间的有效区别是什么(组是相同的在这两种情况下)
消费者 group-id
在整个群集中是相同/全局的。假设你已经启动了一个有2个线程的进程,然后生成另一个进程(可能在不同的机器中),同一个groupId有2个以上的线程,那么kafka将添加这2个新线程来使用来自该主题的消息。因此最终将有4个线程负责从同一主题消费。然后Kafka将触发重新平衡以重新为线程分配分区,因此可能会发生对于线程消耗的特定分区进程P1的
的T1可能是分配给进程P2 的线程 T2消耗。以下几行来自维基页面
当使用相同的使用者组名称启动新进程时,Kafka将添加该进程'线程到线程集可用于消耗主题并触发'重新平衡'。在此重新平衡期间,Kafka会将可用分区分配给可用线程,可能会将分区移动到另一个进程。如果你有旧的和新的业务逻辑混合,有些消息可能会转到旧的逻辑。
I have been working with Kafka lately and have bit of confusion regarding the consumers under a consumer group. The center of the confusion is whether to implement consumers as processes or threads. For this question, assume I am using the high level consumer.
Let's consider a scenario that I have experimented with. In my topic there are 2 partitions (for simplicity let's assume replication factor is just 1). I created a consumer (ConsumerConnector
) process consumer1
with group group1
, then created a topic count map of size 2 and then spawned 2 consumer threads consumer1_thread1
and consumer1_thread2
under that process. It looks like consumer1_thread1
is consuming partition 0
and consumer1_thread2
is consuming partition 1
. Is this behaviour always deterministic? Below is the code snippet. Class TestConsumer
is my consumer thread class.
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
Now, let's consider another scenario (which I haven't experimented but am curious) where I start 2 consumer processes consumer1
and consumer2
both having the same group group1
and each of them is a single threaded process. Now my questions are:
How will the two independent consumer processes (under the same group nevertheless) be related to the partitions in this case ? How is it different from the above single process multi-thread scenario?
In general, how are consumer threads or processes mapped / related to partitions in the topic?
The Kafka documentation does say that each consumer under a consumer group will consume one partition. However, does that refer to a consumer thread (like my above code example) or independent consumer processes?
Is there any subtle thing I am missing here regarding implementing consumers as processes vs threads? Thanks in advance.
A consumer group can have multiple consumer instances running (multiple process with the same group-id
). While consuming each partition is consumed by exactly one consumer instance in the group.
E.g. if your topic contains 2 partitions and you start a consumer group group-A
with 2 consumer instances then each one of them will be consuming messages from a particular partition of the topic.
If you start the same 2 consumer with different group id group-A
& group-B
then the message from both partitions of the topic will be broadcast to each one of them. So in that case the consumer instance running under group-A
will have messages from both the partitions of the topic, and same is true for group-B
as well.
Read more on this on their documentation
EDIT : Based on your comment which says,
I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)
The consumer group-id
is same/global across the cluster. Suppose you have started process-one with 2 threads and then spawn another process (may be in a different machine) with the same groupId having 2 more threads then kafka will add these 2 new threads to consume messages from the topic. So eventually there will be 4 threads responsible for consuming from the same topic. Kafka will then trigger a re-balance to re-assign partitions to threads, so it could happen that for a particular partition which was being consumed by thread T1 of process P1
may be allocated to be consumed by thread T2 of process P2
. The below few lines are taken from the wiki page
When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.
这篇关于Kafka使用者 - 消费者进程和线程与主题分区的关系是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!