重新启动kafka代理后,队列中的所有消息均无法传递到代理 [英] All messages in out queue failed to be delivered to broker when kafka brokers restarted
问题描述
我正在使用C ++ Kafka客户端:librdkafka.该库位于此处 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp .我的程序正在向代理写入2000000条消息.在此过程中,我重新启动了代理.有时,没有消息无法传递给代理.有时大约有100,000条消息无法传递给代理. queue.buffering.max.messages = 100000 .似乎出站队列中的所有消息都丢失了?错误为 RdKafka ::邮件传递报告:本地:未知分区.
I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.
我发现了新问题:(1)有时,大约两次将200条消息发送到代理.(2)有时,一条消息已发送到代理,但是调用了dr_cb().它告诉我,此消息未能传递给经纪人.我正试图弄清这是经纪人还是客户的问题.有人有类似的问题吗?实际上,我需要客户端和代理服务器之间可靠的传输和交付报告.我正在考虑现在使用 C 客户端.不确定是否会再次发生此问题...
I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...
经纪人的日志是:
[2015年7月21日17:48:33471] INFO 0成功当选领导(kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,717]信息新的领导者为0(kafka.server.ZookeeperLeaderElector $ LeaderChangeListener)
[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,718]处理请求时发生错误[KafkaApi-0]错误名称:TopicMetadataRequest;版本:0;CorrelationId:5017;客户编号:rdkafka;主题:测试(kafka.server.KafkaApis)kafka.admin.AdminOperationException:复制因子:比可用代理大1:0在kafka.admin.AdminUtils $ .assignReplicasToBrokers(AdminUtils.scala:70)在kafka.admin.AdminUtils $ .createTopic(AdminUtils.scala:171)在kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:520)在kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:503)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)在scala.collection.immutable.Set $ Set1.foreach(Set.scala:86)在scala.collection.TraversableLike $ class.map(TraversableLike.scala:194)在scala.collection.immutable.Set $ Set1.scala $ collection $ SetLike $$ super $ map(Set.scala:73)在scala.collection.SetLike $ class.map(SetLike.scala:93)在scala.collection.immutable.Set $ Set1.map(Set.scala:73)在kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)在kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)在kafka.server.KafkaApis.handle(KafkaApis.scala:62)在kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)在java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,743] INFO在路径/brokers/ids/0处注册了经纪人0,地址为cyclops-9803:9092.(kafka.utils.ZkUtils $)
[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,759]信息[Kafka Server 0]已启动(kafka.server.KafkaServer)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-07-21 17:48:33,803]信息正在关闭与/127.0.0.1.的套接字连接.(kafka.network.Processor)
[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-07-21 17:48:33,858]信息[代理0上的ReplicaFetcherManager]删除了分区[test,0]的提取程序(kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000]信息[代理0上的ReplicaFetcherManager]删除了分区[test,0]的提取程序(kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,017]信息正在关闭与/127.0.0.1.的套接字连接.(kafka.network.Processor)
[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
我的生产者配置为:
全局配置
client.id = rdkafka
Global config
client.id=rdkafka
metadata.broker.list = localhost:9092
metadata.broker.list=localhost:9092
message.max.bytes = 4000000
message.max.bytes=4000000
receive.message.max.bytes = 100000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms = 900000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms = -1
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt = 10
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms = 250
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse = false
topic.metadata.refresh.sparse=false
socket.timeout.ms = 300000
socket.timeout.ms=300000
socket.send.buffer.bytes = 0
socket.send.buffer.bytes=0
socket.receive.buffer.bytes = 0
socket.receive.buffer.bytes=0
socket.keepalive.enable = false
socket.keepalive.enable=false
socket.max.fails = 10
socket.max.fails=10
broker.address.ttl = 300000
broker.address.ttl=300000
broker.address.family =任何
broker.address.family=any
statistics.interval.ms = 0
statistics.interval.ms=0
error_cb = 0x5288a60
error_cb=0x5288a60
stats_cb = 0x5288ba0
stats_cb=0x5288ba0
log_cb = 0x54942a0
log_cb=0x54942a0
log_level = 6
log_level=6
socket_cb = 0x549e6c0
socket_cb=0x549e6c0
open_cb = 0x54acf90
open_cb=0x54acf90
opaque = 0x9167898
opaque=0x9167898
internal.termination.signal = 0
internal.termination.signal=0
queued.min.messages = 100000
queued.min.messages=100000
queueed.max.messages.kbytes = 1000000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms = 100
fetch.wait.max.ms=100
fetch.message.max.bytes = 1048576
fetch.message.max.bytes=1048576
fetch.min.bytes = 1
fetch.min.bytes=1
fetch.error.backoff.ms = 500
fetch.error.backoff.ms=500
queue.buffering.max.messages = 100000
queue.buffering.max.messages=100000
queue.buffering.max.ms = 1000
queue.buffering.max.ms=1000
message.send.max.retries = 10
message.send.max.retries=10
retry.backoff.ms = 100
retry.backoff.ms=100
compression.codec =无
compression.codec=none
batch.num.messages = 1000
batch.num.messages=1000
delivery.report.only.error = true
delivery.report.only.error=true
request.required.acks = 1
request.required.acks=1
enforce.isr.cnt = 0
enforce.isr.cnt=0
request.timeout.ms = 5000
request.timeout.ms=5000
message.timeout.ms = 300000
message.timeout.ms=300000
produce.offset.report = false
produce.offset.report=false
auto.commit.enable = true
auto.commit.enable=true
auto.commit.interval.ms = 60000
auto.commit.interval.ms=60000
auto.offset.reset =最大
auto.offset.reset=largest
offset.store.path =.
offset.store.path=.
offset.store.sync.interval.ms = -1
offset.store.sync.interval.ms=-1
offset.store.method =文件
offset.store.method=file
consume.callback.max.messages = 0
consume.callback.max.messages=0
使用者输出为:
[2015-07-22 20:57:21,052]警告从代理[id:0,host:cyclops-9803,port:9092]获取主题[Set(test)]的相关ID为1的主题元数据失败(kafka.client.ClientUtils $)java.nio.channels.ClosedChannelException
[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException
在kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
在kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
在kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
在kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
在kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-07-22 20:57:21,073]警告[console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],无法找到Set([test,0])的领导者(kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread)kafka.common.KafkaException:从代理[ArrayBuffer(id:0,host:cyclops-9803,port:9092)]获取主题[Set(test)]的主题元数据失败
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed
在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
在kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)由以下原因引起:java.nio.channels.ClosedChannelException
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException
在kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
在kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
在kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
在kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
欢迎提出任何建议.谢谢.
Any suggestions are welcome. Thanks.
推荐答案
在asyn模式下,客户端应处理此类问题.不知道如何确保出队列中的消息可以100%的概率传递给代理.我们可以做的就是确保邮件在出队列中.如果传递失败,我们应该将消息再次放入队列.如果交付失败,则调用dr_cb().在此功能中,尝试再次将消息放入出队列.也许这不是最好的方法.但是现在,我正在使用这种方式.
In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.
这篇关于重新启动kafka代理后,队列中的所有消息均无法传递到代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!