在使用BlockingCollection作为队列的使用者中等待异步方法 [英] awaiting async method in consumer using BlockingCollection as queue
问题描述
我正在处理一个服务器端控制台应用程序,该应用程序从多个WCF服务接收数据,对其进行一些处理,然后使用SignalR通过单个连接将结果转发到IIS服务器.
I am working on a server side console application which receives data from multiple WCF services, does some work on it and then forwards the results to an IIS server over a single connection using SignalR.
我尝试使用生产者使用者模式来实现这一点,其中WCF服务是生产者,而使用SignalR发送数据的类是使用者.对于队列,我使用了 BlockingCollection .
I tried to implement this using the producer consumer pattern where the WCF services are the producers and the class sending the data using SignalR is the consumer. For the queue I used the BlockingCollection.
但是,当使用await/async在使用者的while循环中发送数据时,卡住了,直到所有其他线程都已将数据添加到队列中为止.
However, when using await/async to send the data in the consumer's while loop gets stuck until all other threads have finished adding data to the queue.
出于测试目的,我用Task.Delay(1000).Wait();
或await Task.Delay(1000);
替换了实际发送数据的代码,这两个代码也都卡住了.
一个简单的Thread.Sleep(1000);
似乎可以正常工作,使我认为异步代码是问题所在.
For testing purposes I have replaced the code actually sending the data with a Task.Delay(1000).Wait();
or await Task.Delay(1000);
which both get stuck as well.
A simple Thread.Sleep(1000);
seems to work just fine, leading me to think the asynchronous code is the problem.
所以我的问题是:是否有某些因素阻止在while循环中完成异步代码?我想念什么?
So my question is: Is there something preventing the asynchronous code being completed in the while loop? What am I missing?
我正在像这样启动使用者线程:
I'm starting the consumer thread like this:
new Thread(Worker).Start();
以及消费者代码:
private void Worker()
{
while (!_queue.IsCompleted)
{
IMobileMessage msg = null;
try
{
msg = _queue.Take();
}
catch (InvalidOperationException)
{
}
if (msg != null)
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
Thread.Sleep(1000); // <-- works
//Task.Delay(1000).Wait(); // <-- doesn't work
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
}
}
}
推荐答案
正如支出者正确指出的那样,BlockingCollection
(顾名思义)仅用于阻塞代码,不适用于异步代码
As spender correctly pointed out, BlockingCollection
(as the name implies) is intended only for use with blocking code, and does not work so well with asynchronous code.
有异步兼容的生产者/消费者队列,例如BufferBlock<T>
.在这种情况下,我认为ActionBlock<T>
会更好:
There are async-compatible producer/consumer queues, such as BufferBlock<T>
. In this case, I would think ActionBlock<T>
would be even better:
private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
await Task.Delay(1000);
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
});
这将替换整个消耗线程和主循环.
This replaces your entire consuming thread and main loop.
这篇关于在使用BlockingCollection作为队列的使用者中等待异步方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!