Kafka-streams:为什么所有分区都分配给消费者组中的同一个消费者? [英] Kafka-streams: Why do all partitions get assigned to the same consumer in the consumergroup?

查看:66
本文介绍了Kafka-streams:为什么所有分区都分配给消费者组中的同一个消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

多台机器生成事件.这些事件被发送到我们的 Kafka 集群,其中每台机器都有自己的主题 (app.machine-events.machine-name).因为在每台机器的基础上顺序很重要,而且分区大小现在不是问题,所以所有主题都由一个分区组成.因此,N 个主题目前也意味着 N 个分区.

Several machines generate events. These events get sent to our Kafka cluster, where each machine has its own topic (app.machine-events.machine-name). Because order is important on a per-machine basis, and partition-size is not an issue for now, all topics consist of a single partition. Therefore N topics also means N partitions, currently.

消费/处理应用程序使用 kafka-streams,我们给了 StreamsConfig.APPLICATION_ID_CONFIG/"application.id" 'machine-event-processor',每个实例都保持不变,这意味着它们被放入 Kafka 的同一个消费者组.这个消费者订阅了 app.machine-events.* 模式,对于处理器来说,它处理哪台机器的事件并不重要.这是由 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose 验证的,向我展示了一个匹配的列表& 的数量正在运行的所有处理服务的 IP.

The consuming/processing app makes use of kafka-streams, which we've given the StreamsConfig.APPLICATION_ID_CONFIG/"application.id" 'machine-event-processor', which remains the same for each instance, meaning they get put into the same consumer group for Kafka. This consumer is subscribed to the pattern app.machine-events.*, as for the processor it does not matter which machine's events it processes. This is verified by ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose showing me a list matching the number of & IPs of all processing services running.

预期

给定 20 台机器和 5 个处理器实例,我们希望每个处理器处理 ~4 个分区(因此 ~4 个主题).

Given 20 machines and 5 instances of the processor, we'd expect each processor to handle ~4 partitions (and therefore ~4 topics).

实际上

有一个处理器处理 20 个分区(因此有 20 个主题),另外 4 个处理器根本不做任何事情/空闲.杀死幸运"处理器,所有 20 个分区都重新平衡到另一个处理器,导致新处理器处理 20 个分区/主题,3 个处理器空闲.

There's one processor handling 20 partitions (and therefore 20 topics), with 4 other processors doing nothing at all/idling. Killing the 'lucky' processor, all 20 partitions get rebalanced to another processor, resulting in the new processor handling 20 partitions/topics, and 3 processors idling.

到目前为止我尝试过的

  • 查看分区.石斑鱼.我觉得我没有完全理解它,但据我所知,无论如何只有 DefaultPartitioner 选项,并且不需要编写自定义选项,因为(根据文档)此设置应该可以工作.它确实提到分区根据它们的分区键加入到任务中(对我们来说都是 0,因为每个主题只有一个分区),但我无法完全理解这部分.
  • 为消费者使用 RoundRobinAssignor:settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName)(尝试了几个值,因为似乎没有任何变化.)
  • 查看其他配置属性,看看我是否遗漏了什么:我认为没有.
  • Check out partition.grouper. I don't feel like I understand it completely, but as far as I'm able to find, there's only the DefaultPartitioner option anyway, and writing a custom one should not be necessary as (as per the documentation) this setup should work. It does mention that partitions get joined into a task based on their partition key (all 0 for us, as there's only one partition per topic), but I was not able to completely understand this part.
  • Used RoundRobinAssignor for the consumer: settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName) (Tried several values, as nothing seems to change.)
  • Check out other configuration properties, to see if I've missed something: None, I think.

代码,简化

val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()

注意事项

  • 正在使用 Kafka-streams 2.0.0:

  • Kafka-streams 2.0.0 is being used:

<代码><依赖><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.0.0</version></dependency>

Kafka 在容器内运行,使用 wurstmeister/kafka:2.11-2.0.0 版本.docker-stack.yml 服务:

Kafka is being run inside a container, using the wurstmeister/kafka:2.11-2.0.0 version. The docker-stack.yml service:

<代码>卡夫卡:图片:wurstmeister/kafka:2.11-2.0.0端口:- 目标:9094发表:9094协议:tcp模式:主机卷:-/var/run/docker.sock:/var/run/docker.sock健康检查:测试:["CMD-SHELL", "$$(netstat -ltn | grep -q 9092)"]间隔:15s超时:10s重试:5环境:HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"KAFKA_ZOOKEEPER_CONNECT:动物园管理员:2181KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS:36000KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:内部:PLAINTEXT,外部:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094KAFKA_INTER_BROKER_LISTENER_NAME:内部KAFKA_DEFAULT_REPLICATION_FACTOR:2部署:复制品:2重启策略:条件:失败延迟:5s最大尝试:3窗口:120s

  • Kafka 在双节点设置中设置,形成一个集群.通过 docker 环境变量,我们将复制因子设置为 2,因此每个分区应该在每个节点上都有一个复制.
  • Kafka is setup in a dual-node setup, forming a cluster. Through the docker environment variable we've set the replication factor to 2, so each partition should have a replication on each node.

我发现并检查的相关主题/问题/讨论

https://faust.readthedocs.io/en/最新/developerguide/partition_assignor.html

查看了 Kafka 邮件档案但没有在那里找到任何东西

Checked out the Kafka mail archives but did not find anything there

查看流示例应用

全方位搜索遇到此问题的其他人,但没有给我所需的答案.还发现了 KAFKA-7144 但这对我们来说应该不是问题,因为我们'正在运行 2.0.0

All-round searching for others that ran into this issue, but did not give me the answers I need. Also found KAFKA-7144 but this should not be an issue for us as we're running 2.0.0

如果有人遇到过类似的问题,或者能指出我可能非常愚蠢的错误,请赐教!

If anyone has run into similar issues, or is able to point out my probably very stupid mistake, please enlighten me!

推荐答案

对于以后遇到同样问题的读者,解决方案是不使用 N 个主题,每个主题有 1 个分区,而是使用 1 个主题和 N 个分区.即使有 120 个分区和 400 多个机器/事件源,多个事件类型也会被放入同一个分区,但这不会影响事件的顺序.

For future readers running into this same issue, the solution was to not use N topics each having 1 partition, but using 1 topic with N partitions. Even with, say, 120 partitions and 400+ machines/event-sources, multiple event types will be put into the same partition, but this does not affect order of the events.

实现是将记录键设置为机器名称,并让底层逻辑负责哪个值去哪个分区.由于我们现在有一个消费者组,其中有 X 个消费者订阅了该主题,因此分区将平均分配给消费者,每个消费者占据 120/X 个分区.

The implementation was to set the record key to the machine-name, and letting the underlying logic take care of which value goes to which partition. Since we now have a consumer-group with X consumers subscribed to this topic, the partitions are being divided over the consumers evenly, each taking 120/X partitions.

这正如 Matthias 所建议的,在 Devoxx Belgium 2018 上 Confluent 的其他有帮助的人进一步证实了这一点.谢谢!

This was as Matthias suggested, which was further confirmed by other helpful people from Confluent at Devoxx Belgium 2018. Thank you!

提示

使用 wurstmeister/kafka docker 镜像时,请考虑使用环境属性:

When using the wurstmeister/kafka docker image, consider using the environment property:

KAFKA_CREATE_TOPICS: "app.machine-events:120:2"

KAFKA_CREATE_TOPICS: "app.machine-events:120:2"

含义

topic-name:number-of-partitions:replication-factor

topic-name:number-of-partitions:replication-factor

这篇关于Kafka-streams:为什么所有分区都分配给消费者组中的同一个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆