如果第一个经纪人关闭,卡夫卡消费者将无法消费 [英] Kafka consumer fails to consume if first broker is down

查看:106
本文介绍了如果第一个经纪人关闭,卡夫卡消费者将无法消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用最新版本的kafka(kafka_2.12-1.0.0.tgz).我设置了带有3个代理的简单集群(只是在每个实例的属性文件中更改了broker.id = 1和listeners = PLAINTEXT://:9092).集群启动后,我使用以下命令创建了主题

 <代码> ./kafka-topics.sh --create --zookeeper localhost:2181-复制因子3-分区13 --topic演示 

然后使用以下命令启动kafka消费者和生产者

 <代码> ./kafka-console-producer.sh --topic demo --broker-list本地主机:9094,本地主机:9093,本地主机:9092./kafka-console-consumer.sh --group test --bootstrap-server本地主机:9094,本地主机:9093,本地主机:9092-主题演示 

所有经纪人都工作起来,一切都很好.但是,如果我先杀死(按启动顺序),则将代理消息发送给代理,但使用者无法接收任何消息.消息不会丢失.启动该经纪人后,消费者立即收到消息.

关闭代理实例后的使用者日志:

[2018-01-09 13:39:31,130]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:31,132]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点1的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:31,344]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:31,451]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点1的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:31,848]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:31,950]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点1的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:32,363]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:33,092]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:34,216]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点2147483646的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)[2018-01-09 13:39:34,218]警告[Consumer clientId = consumer-1,groupId = test]异步自动提交偏移量{demo-0 = OffsetAndMetadata {offset = 3,metadata =''},demo-1 = OffsetAndMetadata {offset = 3,元数据=''},demo-2 = OffsetAndMetadata {offset = 2,元数据=''},demo-3 = OffsetAndMetadata {offset = 2,元数据=''},demo-4 = OffsetAndMetadata {offset = 1,元数据=''},demo-5 = OffsetAndMetadata {offset = 1,元数据=''},demo-6 = OffsetAndMetadata {offset = 3,元数据=''},demo-7 = OffsetAndMetadata {offset = 2,元数据=''},demo-8 = OffsetAndMetadata {offset = 3,元数据=''},demo-9 = OffsetAndMetadata {offset = 2,元数据=''},demo-10 = OffsetAndMetadata {offset = 3,元数据=''},demo-11 = OffsetAndMetadata {offset = 2,元数据=''},demo-12 = OffsetAndMetadata {offset = 2,元数据=''}}失败:偏移提交失败,并发生可重试的异常.您应该重试提交抵消.潜在的错误是:协调器不可用.(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2018-01-09 13:39:34,219]警告[Consumer clientId = consumer-1,groupId = test]无法建立到节点1的连接.经纪人可能不可用.(org.apache.kafka.clients.NetworkClient)

再次启动缺少经纪人后的消费者日志:

[2018-01-09 13:41:21,739]错误[Consumer clientId = consumer-1,groupId = test]在分区demo-0的偏移量3处的偏移量提交失败:这不是正确的协调员.(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2018-01-09 13:41:21,739]警告[Consumer clientId = consumer-1,groupId = test]异步自动提交偏移量{demo-0 = OffsetAndMetadata {offset = 3,元数据=''},demo-1 = OffsetAndMetadata {offset = 3,元数据=''},demo-2 = OffsetAndMetadata {offset = 2,元数据=''},demo-3 = OffsetAndMetadata {offset = 2,元数据=''},demo-4 = OffsetAndMetadata {offset = 1,元数据=''},demo-5 = OffsetAndMetadata {offset = 1,元数据=''},demo-6 = OffsetAndMetadata {offset = 3,元数据=''},demo-7 = OffsetAndMetadata {offset = 2,元数据=''},demo-8 = OffsetAndMetadata {offset = 3,元数据=''},demo-9 = OffsetAndMetadata {offset = 2,元数据=''},demo-10 = OffsetAndMetadata {offset = 3,元数据=''},demo-11 = OffsetAndMetadata {offset = 2,元数据=''},demo-12 = OffsetAndMetadata {offset = 2,元数据=''}}失败:偏移提交失败,并发生可重试的异常.您应该重试提交抵消.潜在的错误是:这是不正确的协调员.(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2018-01-09 13:41:22,353]错误[Consumer clientId = consumer-1,groupId = test]在偏移量为3的分区demo-0上的偏移量提交失败:这不是正确的协调员.(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2018-01-09 13:41:22,354]警告[Consumer clientId = consumer-1,groupId = test]异步自动提交偏移量{demo-0 = OffsetAndMetadata {offset = 3,元数据=''},demo-1 = OffsetAndMetadata {offset = 3,元数据=''},demo-2 = OffsetAndMetadata {offset = 2,元数据=''},demo-3 = OffsetAndMetadata {offset = 2,元数据=''},demo-4 = OffsetAndMetadata {offset = 1,元数据=''},demo-5 = OffsetAndMetadata {offset = 1,元数据=''},demo-6 = OffsetAndMetadata {offset = 3,元数据=''},demo-7 = OffsetAndMetadata {offset = 2,元数据=''},demo-8 = OffsetAndMetadata {offset = 3,元数据=''},demo-9 = OffsetAndMetadata {offset = 2,元数据=''},demo-10 = OffsetAndMetadata {offset = 3,元数据=''},demo-11 = OffsetAndMetadata {offset = 3,元数据=''},demo-12 = OffsetAndMetadata {offset = 2,元数据=''}}失败:偏移提交失败,并发生可重试的异常.您应该重试提交抵消.潜在的错误是:这是不正确的协调员.(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

谢谢

解决方案

尝试检查server-*.properties文件中的"offsets.topic.replication.factor"

例如:

  ############################内部主题设置#组元数据内部主题的复制因子#对于开发测试以外的任何其他内容,建议使用大于1的值以确保可用性,例如3.offsets.topic.replication.factor = 3 

http://kafka.apache.org/documentation/#brokerconfigs

I'm using latest version of kafka(kafka_2.12-1.0.0.tgz). I have setup simple cluster with 3 brokers(just changed broker.id=1 and listeners=PLAINTEXT://:9092 in properties file for each instance).After cluster is up I created topic with the following command

./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo

then start kafka consumer and producers with the following commands

./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

Everything is ok when all brokers are up. But if I kill first(by start order) broker messages are sent to brokers but consumer can not receive any message.Messages are not lost. After starting that broker consumer immediately receives message.

Logs of consumer after shutting down broker instance:

[2018-01-09 13:39:31,130] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,132] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,344] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,451] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,848] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,950] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:32,363] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:33,092] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,216] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,218] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=2, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: The coordinator is not available. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:39:34,219] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Log of consumer after starting missing broker again:

[2018-01-09 13:41:21,739] ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition demo-0 at offset 3: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:21,739] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=2, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,353] ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition demo-0 at offset 3: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,354] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=3, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Thanks

解决方案

Try to check "offsets.topic.replication.factor" in server-*.properties file

For example:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/#brokerconfigs

这篇关于如果第一个经纪人关闭,卡夫卡消费者将无法消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆