在融合的Kafka Dotnet中丢失的消息 [英] Messages getting lost in Confluent Kafka Dotnet
问题描述
我在最近的c#项目中使用Confluent kafka软件包.我通过以下方式创建了生产者:
I am using Confluent kafka package in my recent c# project. I created a producer in the following way:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
但是问题是我的某些消息没有传达给消费者.他们在某个地方迷路了.但是,如果我与生产者一起使用 await ,那么所有消息都将被传递.如何在不等待的情况下传递我的所有消息. (我有一个分区)
But the issue is that some of my messages are not getting to the consumer. They are getting lost somewhere. However, if I use await with the producer then all the messages are getting delivered. How to deliver all my messages without await. (I have a single partition)
推荐答案
首先,您应该只使用一个Producer
来发送您的msgList
,因为为每条消息创建一个新的Producer
确实非常昂贵
First of all, you should use only a single Producer
to send your msgList
, because creating a new Producer
for each message is really expensive.
您可以做的是将Produce()
方法与Flush()
一起使用.使用Produce()
,您将异步发送消息,而无需等待响应.然后调用Flush()
将阻塞,直到传递所有正在运行的消息为止.
What you could do, is use Produce()
method with Flush()
. With Produce()
you will send messages asynchronously without waiting for a response. And then calling a Flush()
will block until all in-flight messages are delivered.
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
如果没有await
或Flush()
,您可能会丢失消息,因为您的生产者可能在传递所有消息之前就被处置了.
Without await
or Flush()
you might lose messages because your producer might be disposed before all the messages are delivered.
这篇关于在融合的Kafka Dotnet中丢失的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!