kafka brokers重启时out queue中的所有消息都无法传递给broker [英] All messages in out queue failed to be delivered to broker when kafka brokers restarted
问题描述
我正在开发 C++ Kafka 客户端:librdkafka.库在这里 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp.我的程序正在向代理写入 2000000 条消息.在此过程中,我重新启动了代理.有时,没有消息无法传递给代理.有时大约 100,000 条消息未能传递给代理.queue.buffering.max.messages=100000.好像out队列里的消息都丢了?错误是RdKafka::Message delivery report: Local: Unknown partition.
I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.
我发现了一些新问题:(1) 有时,大约 200 条消息被发送到 broker.(2) 有时,一条消息已经发送到 broker,但调用了 dr_cb().它告诉我此消息未能传递给经纪人. 我正在尝试弄清楚这是经纪人的问题还是客户的问题.有人有类似的问题吗?事实上,我需要客户端和代理服务器之间可靠的传输和交付报告.我现在正在考虑使用 C 客户端.不知道会不会再出现这个问题...
I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...
broker 的日志是:
The log of broker is:
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,717] INFO 新领导者为 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,718] 处理请求时出现错误 [KafkaApi-0] 错误名称:TopicMetadataRequest;版本:0;相关 ID:5017;客户 ID:rdkafka;主题:测试(kafka.server.KafkaApis)kafka.admin.AdminOperationException:复制因子:1 比可用代理大:0在 kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)在 kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)在 kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)在 kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)在 scala.collection.immutable.Set$Set1.foreach(Set.scala:86)在 scala.collection.TraversableLike$class.map(TraversableLike.scala:194)在 scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)在 scala.collection.SetLike$class.map(SetLike.scala:93)在 scala.collection.immutable.Set$Set1.map(Set.scala:73)在 kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)在 kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)在 kafka.server.KafkaApis.handle(KafkaApis.scala:62)在 kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)在 java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,743] INFO 在路径/brokers/ids/0 注册代理 0,地址为 cyclops-9803:9092.(kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0],启动(kafka.server.KafkaServer)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-07-21 17:48:33,803] 信息正在关闭与/127.0.0.1 的套接字连接.(kafka.network.Processor)
[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-07-21 17:48:33,858] INFO [代理 0 上的 ReplicaFetcherManager] 删除了分区 [test,0] (kafka.server.ReplicaFetcherManager) 的提取器
[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000] INFO [代理 0 上的 ReplicaFetcherManager] 删除了分区 [test,0] (kafka.server.ReplicaFetcherManager) 的提取器
[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,017] 信息正在关闭与/127.0.0.1 的套接字连接.(kafka.network.Processor)
[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
我的生产者配置是:
client.id=rdkafka
Global config
client.id=rdkafka
metadata.broker.list=localhost:9092
metadata.broker.list=localhost:9092
message.max.bytes=4000000
message.max.bytes=4000000
receive.message.max.bytes=100000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms=900000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse=false
topic.metadata.refresh.sparse=false
socket.timeout.ms=300000
socket.timeout.ms=300000
socket.send.buffer.bytes=0
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.keepalive.enable=false
socket.max.fails=10
socket.max.fails=10
broker.address.ttl=300000
broker.address.ttl=300000
broker.address.family=any
broker.address.family=any
statistics.interval.ms=0
statistics.interval.ms=0
error_cb=0x5288a60
error_cb=0x5288a60
stats_cb=0x5288ba0
stats_cb=0x5288ba0
log_cb=0x54942a0
log_cb=0x54942a0
log_level=6
log_level=6
socket_cb=0x549e6c0
socket_cb=0x549e6c0
open_cb=0x54acf90
open_cb=0x54acf90
不透明=0x9167898
opaque=0x9167898
internal.termination.signal=0
internal.termination.signal=0
queued.min.messages=100000
queued.min.messages=100000
queued.max.messages.kbytes=1000000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms=100
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.message.max.bytes=1048576
fetch.min.bytes=1
fetch.min.bytes=1
fetch.error.backoff.ms=500
fetch.error.backoff.ms=500
queue.buffering.max.messages=100000
queue.buffering.max.messages=100000
queue.buffering.max.ms=1000
queue.buffering.max.ms=1000
message.send.max.retries=10
message.send.max.retries=10
retry.backoff.ms=100
retry.backoff.ms=100
compression.codec=none
compression.codec=none
batch.num.messages=1000
batch.num.messages=1000
delivery.report.only.error=true
delivery.report.only.error=true
request.required.acks=1
request.required.acks=1
enforce.isr.cnt=0
enforce.isr.cnt=0
request.timeout.ms=5000
request.timeout.ms=5000
message.timeout.ms=300000
message.timeout.ms=300000
produce.offset.report=false
produce.offset.report=false
auto.commit.enable=true
auto.commit.enable=true
auto.commit.interval.ms=60000
auto.commit.interval.ms=60000
auto.offset.reset=最大
auto.offset.reset=largest
offset.store.path=.
offset.store.path=.
offset.store.sync.interval.ms=-1
offset.store.sync.interval.ms=-1
offset.store.method=file
offset.store.method=file
consume.callback.max.messages=0
consume.callback.max.messages=0
消费者输出为:
[2015-07-22 20:57:21,052] 警告从代理 [id:0,host:cyclops-9803,port:9092] 获取主题相关 id 为 1 的主题元数据 [Set(test)] 失败(kafka.client.ClientUtils$)java.nio.channels.ClosedChannelException
[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException
在 kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
在 kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],未能找到 Set([test,0kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)kafka.common.KafkaException:从代理 [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] 获取主题 [Set(test)] 的主题元数据失败
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)引起:java.nio.channels.ClosedChannelException
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException
在 kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
在 kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
欢迎提出任何建议.谢谢.
Any suggestions are welcome. Thanks.
推荐答案
在异步模式下,客户端应该处理这种问题.不知道如何确保输出队列中的消息可以 100% 的概率传递给代理.我们能做的就是确保消息在出队列中.如果发送失败,我们应该将消息重新放入队列.如果交付失败,则调用 dr_cb().在该函数中,再次尝试将消息放入输出队列.也许这不是最好的方法.但是现在,我用的是这种方式.
In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.
这篇关于kafka brokers重启时out queue中的所有消息都无法传递给broker的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!