重新启动kafka代理后,队列中的所有消息均无法传递到代理 [英] All messages in out queue failed to be delivered to broker when kafka brokers restarted

查看:119
本文介绍了重新启动kafka代理后,队列中的所有消息均无法传递到代理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用C ++ Kafka客户端:librdkafka.该库位于此处 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp .我的程序正在向代理写入2000000条消息.在此过程中,我重新启动了代理.有时,没有消息无法传递给代理.有时大约有100,000条消息无法传递给代理. queue.buffering.max.messages = 100000 .似乎出站队列中的所有消息都丢失了?错误为 RdKafka ::邮件传递报告:本地:未知分区.

I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.

我发现了新问题:(1)有时,大约两次将200条消息发送到代理.(2)有时,一条消息已发送到代理,但是调用了dr_cb().它告诉我,此消息未能传递给经纪人.我正试图弄清这是经纪人还是客户的问题.有人有类似的问题吗?实际上,我需要客户端和代理服务器之间可靠的传输和交付报告.我正在考虑现在使用 C 客户端.不确定是否会再次发生此问题...

I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...

经纪人的日志是:

[2015年7月21日17:48:33471] INFO 0成功当选领导(kafka.server.ZookeeperLeaderElector)

[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2015-07-21 17:48:33,717]信息新的领导者为0(kafka.server.ZookeeperLeaderElector $ LeaderChangeListener)

[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2015-07-21 17:48:33,718]处理请求时发生错误[KafkaApi-0]错误名称:TopicMetadataRequest;版本:0;CorrelationId:5017;客户编号:rdkafka;主题:测试(kafka.server.KafkaApis)kafka.admin.AdminOperationException:复制因子:比可用代理大1:0在kafka.admin.AdminUtils $ .assignReplicasToBrokers(AdminUtils.scala:70)在kafka.admin.AdminUtils $ .createTopic(AdminUtils.scala:171)在kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:520)在kafka.server.KafkaApis $$ anonfun $ 19.apply(KafkaApis.scala:503)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:194)在scala.collection.immutable.Set $ Set1.foreach(Set.scala:86)在scala.collection.TraversableLike $ class.map(TraversableLike.scala:194)在scala.collection.immutable.Set $ Set1.scala $ collection $ SetLike $$ super $ map(Set.scala:73)在scala.collection.SetLike $ class.map(SetLike.scala:93)在scala.collection.immutable.Set $ Set1.map(Set.scala:73)在kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)在kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)在kafka.server.KafkaApis.handle(KafkaApis.scala:62)在kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)在java.lang.Thread.run(Thread.java:745)

[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)

[2015-07-21 17:48:33,743] INFO在路径/brokers/ids/0处注册了经纪人0,地址为cyclops-9803:9092.(kafka.utils.ZkUtils $)

[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)

[2015-07-21 17:48:33,759]信息[Kafka Server 0]已启动(kafka.server.KafkaServer)

[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[2015-07-21 17:48:33,803]信息正在关闭与/127.0.0.1.的套接字连接.(kafka.network.Processor)

[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2015-07-21 17:48:33,858]信息[代理0上的ReplicaFetcherManager]删除了分区[test,0]的提取程序(kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,000]信息[代理0上的ReplicaFetcherManager]删除了分区[test,0]的提取程序(kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,017]信息正在关闭与/127.0.0.1.的套接字连接.(kafka.network.Processor)

[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

我的生产者配置为:

全局配置

client.id = rdkafka

Global config

client.id=rdkafka

metadata.broker.list = localhost:9092

metadata.broker.list=localhost:9092

message.max.bytes = 4000000

message.max.bytes=4000000

receive.message.max.bytes = 100000000

receive.message.max.bytes=100000000

metadata.request.timeout.ms = 900000

metadata.request.timeout.ms=900000

topic.metadata.refresh.interval.ms = -1

topic.metadata.refresh.interval.ms=-1

topic.metadata.refresh.fast.cnt = 10

topic.metadata.refresh.fast.cnt=10

topic.metadata.refresh.fast.interval.ms = 250

topic.metadata.refresh.fast.interval.ms=250

topic.metadata.refresh.sparse = false

topic.metadata.refresh.sparse=false

socket.timeout.ms = 300000

socket.timeout.ms=300000

socket.send.buffer.bytes = 0

socket.send.buffer.bytes=0

socket.receive.buffer.bytes = 0

socket.receive.buffer.bytes=0

socket.keepalive.enable = false

socket.keepalive.enable=false

socket.max.fails = 10

socket.max.fails=10

broker.address.ttl = 300000

broker.address.ttl=300000

broker.address.family =任何

broker.address.family=any

statistics.interval.ms = 0

statistics.interval.ms=0

error_cb = 0x5288a60

error_cb=0x5288a60

stats_cb = 0x5288ba0

stats_cb=0x5288ba0

log_cb = 0x54942a0

log_cb=0x54942a0

log_level = 6

log_level=6

socket_cb = 0x549e6c0

socket_cb=0x549e6c0

open_cb = 0x54acf90

open_cb=0x54acf90

opaque = 0x9167898

opaque=0x9167898

internal.termination.signal = 0

internal.termination.signal=0

queued.min.messages = 100000

queued.min.messages=100000

queueed.max.messages.kbytes = 1000000

queued.max.messages.kbytes=1000000

fetch.wait.max.ms = 100

fetch.wait.max.ms=100

fetch.message.max.bytes = 1048576

fetch.message.max.bytes=1048576

fetch.min.bytes = 1

fetch.min.bytes=1

fetch.error.backoff.ms = 500

fetch.error.backoff.ms=500

queue.buffering.max.messages = 100000

queue.buffering.max.messages=100000

queue.buffering.max.ms = 1000

queue.buffering.max.ms=1000

message.send.max.retries = 10

message.send.max.retries=10

retry.backoff.ms = 100

retry.backoff.ms=100

compression.codec =无

compression.codec=none

batch.num.messages = 1000

batch.num.messages=1000

delivery.report.only.error = true

delivery.report.only.error=true

request.required.acks = 1

request.required.acks=1

enforce.isr.cnt = 0

enforce.isr.cnt=0

request.timeout.ms = 5000

request.timeout.ms=5000

message.timeout.ms = 300000

message.timeout.ms=300000

produce.offset.report = false

produce.offset.report=false

auto.commit.enable = true

auto.commit.enable=true

auto.commit.interval.ms = 60000

auto.commit.interval.ms=60000

auto.offset.reset =最大

auto.offset.reset=largest

offset.store.path =.

offset.store.path=.

offset.store.sync.interval.ms = -1

offset.store.sync.interval.ms=-1

offset.store.method =文件

offset.store.method=file

consume.callback.max.messages = 0

consume.callback.max.messages=0

使用者输出为:

[2015-07-22 20:57:21,052]警告从代理[id:0,host:cyclops-9803,port:9092]获取主题[Set(test)]的相关ID为1的主题元数据失败(kafka.client.ClientUtils $)java.nio.channels.ClosedChannelException

[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException

在kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

在kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

在kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

在kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-07-22 20:57:21,073]警告[console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],无法找到Set([test,0])的领导者(kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread)kafka.common.KafkaException:从代理[ArrayBuffer(id:0,host:cyclops-9803,port:9092)]获取主题[Set(test)]的主题元数据失败

[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed

在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:72)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)

在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:93)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

在kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)由以下原因引起:java.nio.channels.ClosedChannelException

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException

在kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

在kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

在kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

欢迎提出任何建议.谢谢.

Any suggestions are welcome. Thanks.

推荐答案

在asyn模式下,客户端应处理此类问题.不知道如何确保出队列中的消息可以100%的概率传递给代理.我们可以做的就是确保邮件在出队列中.如果传递失败,我们应该将消息再次放入队列.如果交付失败,则调用dr_cb().在此功能中,尝试再次将消息放入出队列.也许这不是最好的方法.但是现在,我正在使用这种方式.

In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.

这篇关于重新启动kafka代理后,队列中的所有消息均无法传递到代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆