Kafka-streams:为什么所有分区都分配给使用者组中的同一使用者? [英] Kafka-streams: Why do all partitions get assigned to the same consumer in the consumergroup?

查看:36
本文介绍了Kafka-streams:为什么所有分区都分配给使用者组中的同一使用者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

几台机器会生成事件.这些事件将发送到我们的Kafka集群,其中每台机器都有自己的主题(app.machine-events.机器名称).因为顺序对于每台计算机而言很重要,并且分区大小现在不是问题,所以所有主题都由一个分区组成.因此,当前N个主题也意味着N个分区.

Several machines generate events. These events get sent to our Kafka cluster, where each machine has its own topic (app.machine-events.machine-name). Because order is important on a per-machine basis, and partition-size is not an issue for now, all topics consist of a single partition. Therefore N topics also means N partitions, currently.

消费/处理应用程序利用kafka流,我们为它提供了 StreamsConfig.APPLICATION_ID_CONFIG /"application.id" '机器事件处理器",每个实例保持不变,这意味着它们将被归入Kafka的同一消费者组.该使用者已订阅了 app.machine-events.* 模式,因为对于处理器而言,它处理的是哪个机器的事件都无关紧要.这已通过验证./kafka-consumer-groups.sh--bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose 向我显示了一个列表匹配项&的数量正在运行的所有处理服务的IP.

The consuming/processing app makes use of kafka-streams, which we've given the StreamsConfig.APPLICATION_ID_CONFIG/"application.id" 'machine-event-processor', which remains the same for each instance, meaning they get put into the same consumer group for Kafka. This consumer is subscribed to the pattern app.machine-events.*, as for the processor it does not matter which machine's events it processes. This is verified by ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose showing me a list matching the number of & IPs of all processing services running.

期望

考虑到20台计算机和5个处理器实例,我们希望每个处理器能够处理约4个分区(因此约有4个主题).

Given 20 machines and 5 instances of the processor, we'd expect each processor to handle ~4 partitions (and therefore ~4 topics).

实际上

有一个处理器处理20个分区(因此有20个主题),而其他4个处理器则什么也不做/空闲.杀死了幸运"处理器后,所有20个分区都重新平衡到另一个处理器,导致新处理器处理20个分区/主题,并使3个处理器空闲.

There's one processor handling 20 partitions (and therefore 20 topics), with 4 other processors doing nothing at all/idling. Killing the 'lucky' processor, all 20 partitions get rebalanced to another processor, resulting in the new processor handling 20 partitions/topics, and 3 processors idling.

到目前为止我尝试过的事情

  • 签出 分区.石斑鱼 .我不完全了解,但是据我所知,仍然只有DefaultPartitioner选项,并且不需要编写自定义选项,因为(根据文档)此设置可以正常工作.它确实提到了分区是基于分区键(对于我们来说都是0,因为每个主题只有一个分区)加入到任务中,但是我无法完全理解这一部分.
  • 为消费者使用的RoundRobinAssignor: settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),新的RoundRobinAssignor().getClass.getName)(尝试了几个值,似乎没有什么改变.)
  • 查看其他
  • Check out partition.grouper. I don't feel like I understand it completely, but as far as I'm able to find, there's only the DefaultPartitioner option anyway, and writing a custom one should not be necessary as (as per the documentation) this setup should work. It does mention that partitions get joined into a task based on their partition key (all 0 for us, as there's only one partition per topic), but I was not able to completely understand this part.
  • Used RoundRobinAssignor for the consumer: settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName) (Tried several values, as nothing seems to change.)
  • Check out other configuration properties, to see if I've missed something: None, I think.

简化的代码

val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()

注释

  • 正在使用Kafka-streams 2.0.0:

  • Kafka-streams 2.0.0 is being used:

< dependency>< groupId> org.apache.kafka</groupId>< artifactId> kafka-streams//artifactId>< version> 2.0.0</version></dependency>

Kafka正在使用 wurstmeister/kafka:2.11-2.0.0 版本在容器内运行.docker-stack.yml服务:

Kafka is being run inside a container, using the wurstmeister/kafka:2.11-2.0.0 version. The docker-stack.yml service:

kafka:图片:wurstmeister/kafka:2.11-2.0.0端口:-目标:9094发行:9094通讯协定:tcp模式:主机数量:-/var/run/docker.sock:/var/run/docker.sock健康检查:测试:["CMD-SHELL","$$(netstat -ltn | grep -q 9092)"]间隔:15s超时:10秒重试:5环境:HOSTNAME_COMMAND:码头工人信息| grep ^名称:| cut -d''-f 2"KAFKA_ZOOKEEPER_CONNECT:动物园管理员:2181KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS:36000KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXTKAFKA_ADVERTISED_LISTENERS:INSIDE://:9092,OUTSIDE://__ HOSTNAME_COMMAND}:9094KAFKA_LISTENERS:INSIDE://:9092,OUTSIDE://:9094KAFKA_INTER_BROKER_LISTENER_NAME:INSIDEKAFKA_DEFAULT_REPLICATION_FACTOR:2部署:复制品:2restart_policy:条件:失效延迟:5秒max_attempts:3窗口:120秒

  • Kafka是在双节点设置中设置的,形成一个集群.通过docker环境变量,我们将复制因子设置为 2 ,因此每个分区在每个节点上都应该有一个复制.
  • Kafka is setup in a dual-node setup, forming a cluster. Through the docker environment variable we've set the replication factor to 2, so each partition should have a replication on each node.

我找到并检查过的相关主题/问题/讨论

https://faust.readthedocs.io/en/Latest/developerguide/partition_assignor.html

检出了Kafka 邮件档案,但是没有在那找到任何东西

Checked out the Kafka mail archives but did not find anything there

签出了流示例应用

全方位搜索遇到此问题的其他人,但没有给我所需的答案.还可以找到 KAFKA-7144 ,但这对我们来说应该不是问题正在运行2.0.0

All-round searching for others that ran into this issue, but did not give me the answers I need. Also found KAFKA-7144 but this should not be an issue for us as we're running 2.0.0

如果有人遇到类似问题,或者能够指出我可能非常愚蠢的错误,请赐教!

If anyone has run into similar issues, or is able to point out my probably very stupid mistake, please enlighten me!

推荐答案

对于将来遇到同样问题的读者,解决方案是不使用每个都有1个分区的N个主题,而是使用每个N个分区的1个主题.即使有120个分区和400多个机器/事件源,也可以将多个事件类型放入同一个分区,但这不会影响事件的顺序.

For future readers running into this same issue, the solution was to not use N topics each having 1 partition, but using 1 topic with N partitions. Even with, say, 120 partitions and 400+ machines/event-sources, multiple event types will be put into the same partition, but this does not affect order of the events.

实现是将记录键设置为计算机名称,然后让底层逻辑负责将哪个值分配给哪个分区.由于我们现在有一个消费者组,其中有X个消费者订阅了该主题,因此分区在消费者上平均分配,每个分区有120/X个分区.

The implementation was to set the record key to the machine-name, and letting the underlying logic take care of which value goes to which partition. Since we now have a consumer-group with X consumers subscribed to this topic, the partitions are being divided over the consumers evenly, each taking 120/X partitions.

正如Matthias所建议的那样,Confluent的其他乐于助人的人在Devoxx Belgium 2018上得到了进一步证实.谢谢!

This was as Matthias suggested, which was further confirmed by other helpful people from Confluent at Devoxx Belgium 2018. Thank you!

提示

使用wurstmeister/kafka docker映像时,请考虑使用环境属性:

When using the wurstmeister/kafka docker image, consider using the environment property:

KAFKA_CREATE_TOPICS:"app.machine-events:120:2"

KAFKA_CREATE_TOPICS: "app.machine-events:120:2"

含义

主题名称:分区数:复制因子

topic-name:number-of-partitions:replication-factor

这篇关于Kafka-streams:为什么所有分区都分配给使用者组中的同一使用者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆