kafka brokers重启时out queue中的所有消息都无法传递给broker [英] All messages in out queue failed to be delivered to broker when kafka brokers restarted

查看:58
本文介绍了kafka brokers重启时out queue中的所有消息都无法传递给broker的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发 C++ Kafka 客户端:librdkafka.库在这里 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp.我的程序正在向代理写入 2000000 条消息.在此过程中,我重新启动了代理.有时,没有消息无法传递给代理.有时大约 100,000 条消息未能传递给代理.queue.buffering.max.messages=100000.好像out队列里的消息都丢了?错误是RdKafka::Message delivery report: Local: Unknown partition.

I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.

我发现了一些新问题:(1) 有时,大约 200 条消息被发送到 broker.(2) 有时,一条消息已经发送到 broker,但调用了 dr_cb().它告诉我此消息未能传递给经纪人. 我正在尝试弄清楚这是经纪人的问题还是客户的问题.有人有类似的问题吗?事实上,我需要客户端和代理服务器之间可靠的传输和交付报告.我现在正在考虑使用 C 客户端.不知道会不会再出现这个问题...

I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...

broker 的日志是:

The log of broker is:

[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2015-07-21 17:48:33,717] INFO 新领导者为 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2015-07-21 17:48:33,718] 处理请求时出现错误 [KafkaApi-0] 错误名称:TopicMetadataRequest;版本:0;相关 ID:5017;客户 ID:rdkafka;主题:测试(kafka.server.KafkaApis)kafka.admin.AdminOperationException:复制因子:1 比可用代理大:0在 kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)在 kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)在 kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)在 kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)在 scala.collection.immutable.Set$Set1.foreach(Set.scala:86)在 scala.collection.TraversableLike$class.map(TraversableLike.scala:194)在 scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)在 scala.collection.SetLike$class.map(SetLike.scala:93)在 scala.collection.immutable.Set$Set1.map(Set.scala:73)在 kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)在 kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)在 kafka.server.KafkaApis.handle(KafkaApis.scala:62)在 kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)在 java.lang.Thread.run(Thread.java:745)

[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)

[2015-07-21 17:48:33,743] INFO 在路径/brokers/ids/0 注册代理 0,地址为 cyclops-9803:9092.(kafka.utils.ZkUtils$)

[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)

[2015-07-21 17:48:33,759] INFO [Kafka Server 0],启动(kafka.server.KafkaServer)

[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[2015-07-21 17:48:33,803] 信息正在关闭与/127.0.0.1 的套接字连接.(kafka.network.Processor)

[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2015-07-21 17:48:33,858] INFO [代理 0 上的 ReplicaFetcherManager] 删除了分区 [test,0] (kafka.server.ReplicaFetcherManager) 的提取器

[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,000] INFO [代理 0 上的 ReplicaFetcherManager] 删除了分区 [test,0] (kafka.server.ReplicaFetcherManager) 的提取器

[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,017] 信息正在关闭与/127.0.0.1 的套接字连接.(kafka.network.Processor)

[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

我的生产者配置是:

client.id=rdkafka

Global config

client.id=rdkafka

metadata.broker.list=localhost:9092

metadata.broker.list=localhost:9092

message.max.bytes=4000000

message.max.bytes=4000000

receive.message.max.bytes=100000000

receive.message.max.bytes=100000000

metadata.request.timeout.ms=900000

metadata.request.timeout.ms=900000

topic.metadata.refresh.interval.ms=-1

topic.metadata.refresh.interval.ms=-1

topic.metadata.refresh.fast.cnt=10

topic.metadata.refresh.fast.cnt=10

topic.metadata.refresh.fast.interval.ms=250

topic.metadata.refresh.fast.interval.ms=250

topic.metadata.refresh.sparse=false

topic.metadata.refresh.sparse=false

socket.timeout.ms=300000

socket.timeout.ms=300000

socket.send.buffer.bytes=0

socket.send.buffer.bytes=0

socket.receive.buffer.bytes=0

socket.receive.buffer.bytes=0

socket.keepalive.enable=false

socket.keepalive.enable=false

socket.max.fails=10

socket.max.fails=10

broker.address.ttl=300000

broker.address.ttl=300000

broker.address.family=any

broker.address.family=any

statistics.interval.ms=0

statistics.interval.ms=0

error_cb=0x5288a60

error_cb=0x5288a60

stats_cb=0x5288ba0

stats_cb=0x5288ba0

log_cb=0x54942a0

log_cb=0x54942a0

log_level=6

log_level=6

socket_cb=0x549e6c0

socket_cb=0x549e6c0

open_cb=0x54acf90

open_cb=0x54acf90

不透明=0x9167898

opaque=0x9167898

internal.termination.signal=0

internal.termination.signal=0

queued.min.messages=100000

queued.min.messages=100000

queued.max.messages.kbytes=1000000

queued.max.messages.kbytes=1000000

fetch.wait.max.ms=100

fetch.wait.max.ms=100

fetch.message.max.bytes=1048576

fetch.message.max.bytes=1048576

fetch.min.bytes=1

fetch.min.bytes=1

fetch.error.backoff.ms=500

fetch.error.backoff.ms=500

queue.buffering.max.messages=100000

queue.buffering.max.messages=100000

queue.buffering.max.ms=1000

queue.buffering.max.ms=1000

message.send.max.retries=10

message.send.max.retries=10

retry.backoff.ms=100

retry.backoff.ms=100

compression.codec=none

compression.codec=none

batch.num.messages=1000

batch.num.messages=1000

delivery.report.only.error=true

delivery.report.only.error=true

request.required.acks=1

request.required.acks=1

enforce.isr.cnt=0

enforce.isr.cnt=0

request.timeout.ms=5000

request.timeout.ms=5000

message.timeout.ms=300000

message.timeout.ms=300000

produce.offset.report=false

produce.offset.report=false

auto.commit.enable=true

auto.commit.enable=true

auto.commit.interval.ms=60000

auto.commit.interval.ms=60000

auto.offset.reset=最大

auto.offset.reset=largest

offset.store.path=.

offset.store.path=.

offset.store.sync.interval.ms=-1

offset.store.sync.interval.ms=-1

offset.store.method=file

offset.store.method=file

consume.callback.max.messages=0

consume.callback.max.messages=0

消费者输出为:

[2015-07-22 20:57:21,052] 警告从代理 [id:0,host:cyclops-9803,port:9092] 获取主题相关 id 为 1 的主题元数据 [Set(test)] 失败(kafka.client.ClientUtils$)java.nio.channels.ClosedChannelException

[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException

在 kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

在 kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],未能找到 Set([test,0kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)kafka.common.KafkaException:从代理 [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] 获取主题 [Set(test)] 的主题元数据失败

[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed

在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)

在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)引起:java.nio.channels.ClosedChannelException

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException

在 kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

在 kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

欢迎提出任何建议.谢谢.

Any suggestions are welcome. Thanks.

推荐答案

在异步模式下,客户端应该处理这种问题.不知道如何确保输出队列中的消息可以 100% 的概率传递给代理.我们能做的就是确保消息在出队列中.如果发送失败,我们应该将消息重新放入队列.如果交付失败,则调用 dr_cb().在该函数中,再次尝试将消息放入输出队列.也许这不是最好的方法.但是现在,我用的是这种方式.

In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.

这篇关于kafka brokers重启时out queue中的所有消息都无法传递给broker的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆