现有内部主题具有无效的分区 [英] Existing internal topic has invalid partitions

查看:12
本文介绍了现有内部主题具有无效的分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在只有一个Kafka Broker的测试设置中启动我们的Kafka Streams应用程序时,我们看到以下错误,大约每15次运行中就有一次出现以下错误:

org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartition has invalid partitions: expected: 32; actual: 12. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

当我们看到实际分区数以上的错误时(预期为32,实际大于0,小于32)。

我们正在调用org.apache.kafka.streams.KafkaStreams#start之前执行org.apache.kafka.streams.KafkaStreams#cleanUp。对于每个测试运行,Kafka Broker在没有数据的情况下启动(使用https://hub.docker.com/r/wurstmeister/kafka/)。

查看卡夫卡经纪人的日志时,我们看到以下内容:

2018-10-22 18:41:31,373] INFO Topic creation Map(
    alarm-message-streams-by-organization-repartition-19 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-22 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-0 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-7 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-23 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-1 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-24 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-2 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-30 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-5 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-21 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-8 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-14 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-15 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-6 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-16 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-31 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-25 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-9 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-20 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-29 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-13 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-26 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-17 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-4 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-10 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-3 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-11 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-12 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-28 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-27 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-18 -> ArrayBuffer(42)
) (kafka.zk.AdminZkClient)

看起来主题是使用预期的分区数(32)创建的。后来,在同一日志中,看起来像是有再次创建该主题的请求。我们不知道为什么会发生这种情况,但至少请求仍包含预期的分区数(32):

[2018-10-22 18:43:29,851] INFO [Admin Manager on Broker 42]: Error processing create topic request for topic alarm-message-streams-by-organization-repartition with arguments (numPartitions=32, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete, segment.bytes=52428800, segment.ms=600000, retention.ms=9223372036854775807, segment.index.bytes=52428800}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.TopicExistsException: Topic 'alarm-message-streams-by-organization-repartition' already exists.

我们从未在非测试中看到过这种情况,在非测试中,我们与6个Kafka经纪人一起运行。但是,我们运行的测试运行次数明显高于部署到非测试的测试运行次数。

注意:导致错误的主题并不总是相同。

该错误导致我们的测试设置中出现片状现象,因此我们希望了解它发生的原因并进行处理。有人能对这种卡夫卡流的行为提供一些见解吗?

我们使用的是Kafka和Kafka Streams 2.0.0。

推荐答案

似乎从Kafka集群(即您的单经纪人)收到的元数据不完整/不正确。在启动时(或者更准确地说,在每次重新平衡中),Kafka Streams检查内部主题是否存在预期的分区数量。如果主题不存在,则会创建该主题(这应该只在应用程序的活动时间内发生一次)。如果存在具有正确分区数的主题,则使用该主题。如果该主题的分区数不正确,则会引发您报告的异常。

调用KafkaStreams#cleanup()在这里应该不会有任何影响。与通过bin/kafka-streams-application-reset.sh调用的StreamResetter不同(参见https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html)

我目前不知道问题的根本原因是什么,也就是为什么Kafka Streams收到了不正确的主题元数据。希望这能有所帮助。

这篇关于现有内部主题具有无效的分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆