消息在 Confluent Kafka Dotnet 中丢失 [英] Messages getting lost in Confluent Kafka Dotnet
问题描述
我在最近的 c# 项目中使用了 Confluent kafka 包.我通过以下方式创建了一个生产者:
I am using Confluent kafka package in my recent c# project. I created a producer in the following way:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
但问题是我的一些消息没有传达给消费者.他们正在某处迷路.但是,如果我将 await 与生产者一起使用,那么所有消息都会被传递.如何在不等待的情况下传递我的所有消息.(我只有一个分区)
But the issue is that some of my messages are not getting to the consumer. They are getting lost somewhere. However, if I use await with the producer then all the messages are getting delivered. How to deliver all my messages without await. (I have a single partition)
推荐答案
首先,你应该只使用一个 Producer
来发送你的 msgList
,因为创建一个每个消息的新 Producer
真的很昂贵.
First of all, you should use only a single Producer
to send your msgList
, because creating a new Producer
for each message is really expensive.
您可以做的是将 Produce()
方法与 Flush()
结合使用.使用 Produce()
您将异步发送消息而无需等待响应.然后调用 Flush()
将阻塞,直到所有传输中的消息都被传递.
What you could do, is use Produce()
method with Flush()
. With Produce()
you will send messages asynchronously without waiting for a response. And then calling a Flush()
will block until all in-flight messages are delivered.
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
如果没有 await
或 Flush()
,您可能会丢失消息,因为您的生产者可能会在所有消息发送之前被处理掉.
Without await
or Flush()
you might lose messages because your producer might be disposed before all the messages are delivered.
这篇关于消息在 Confluent Kafka Dotnet 中丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!