从某些线程运行preventing任务 [英] Preventing task from running on certain thread

查看:163
本文介绍了从某些线程运行preventing任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在努力与一些异步一点等待的东西。我使用RabbitMQ的一些程序之间发送/接收消息。

I have been struggling a bit with some async await stuff. I am using RabbitMQ for sending/receiving messages between some programs.

作为一个有点背景下,RabbitMQ的客户机使用3左右线程我可以看到:连接线和两个心跳线程。每当通过TCP接收到消息,连接线程处理它,并调用我已经通过接口提供一个回调。文档说,这是最好避免,因为它的这个调用在同一线程上完成的连接,东西都需要继续做的过程中大量的工作。它们提供一个 QueueingBasicConsumer 具有其用于等待要接收的消息的阻挡'出列'方法

As a bit of background, the RabbitMQ client uses 3 or so threads that I can see: A connection thread and two heartbeat threads. Whenever a message is received via TCP, the connection thread handles it and calls a callback which I have supplied via an interface. The documentation says that it is best to avoid doing lots of work during this call since its done on the same thread as the connection and things need to continue on. They supply a QueueingBasicConsumer which has a blocking 'Dequeue' method which is used to wait for a message to be received.

我希望我的消费者能够在这段等待的时间实际上释放自己的线程上下文所以别人可以做一些工作,所以我决定用异步/等待的任务。我写了 AwaitableBasicConsumer 类,它使用 TaskCompletionSource ■在下列方式:

I wanted my consumers to be able to actually release their thread context during this waiting time so somebody else could do some work, so I decided to use async/await tasks. I wrote an AwaitableBasicConsumer class which uses TaskCompletionSources in the following fashion:

我有一个awaitable出队的方法:

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    //we are enqueueing a TCS. This is a "read"
    rwLock.EnterReadLock();

    try
    {
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

        //if we are cancelled before we finish, this will cause the tcs to become cancelled
        cancellationToken.Register(() =>
        {
            tcs.TrySetCanceled();
        });

        //if there is something in the undelivered queue, the task will be immediately completed
        //otherwise, we queue the task into deliveryTCS
        if (!TryDeliverUndelivered(tcs))
            deliveryTCS.Enqueue(tcs);
        }

        return tcs.Task;
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

其中的RabbitMQ客户端调用回调函数满足任务:这是从AMQP连接线程的上下文中被称为

The callback which the rabbitmq client calls fulfills the tasks: This is called from the context of the AMQP Connection thread

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    //we want nothing added while we remove. We also block until everybody is done.
    rwLock.EnterWriteLock();
    try
    {
        RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

        bool sent = false;
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
        while (deliveryTCS.TryDequeue(out tcs))
        {
            //once we manage to actually set somebody's result, we are done with handling this
            if (tcs.TrySetResult(e))
            {
                sent = true;
                break;
            }
        }

        //if nothing was sent, we queue up what we got so that somebody can get it later.
        /**
         * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
         * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
         * doing our thing here.
         */
        if (!sent)
        {
            undelivered.Enqueue(e);
        }
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

rwlock中 ReaderWriterLockSlim 。这两个队列( deliveryTCS 未交付)是ConcurrentQueues。

rwLock is a ReaderWriterLockSlim. The two queues (deliveryTCS and undelivered) are ConcurrentQueues.

问题:

在每隔一段时间,等待着出列方法的方法抛出一个异常。这通常不会成为一个问题,因为该方法也是异步,所以它进入该任务进入异常完成状态。问题出现在调用任务 DequeueAsync 是AMQP连接线,该客户端的RabbitMQ上创建的await后恢复的情况。通常,我见过任务恢复到主线程或工作线程漂浮之一。然而,当它恢复到AMQP线程并抛出一个异常,一切都摊。该任务的的进入异常状态和AMQP连接螺纹是左说,它正在执行的有异常发生的方法。

Every once in a while, the method that awaits the dequeue method throws an exception. This would not normally be an issue since that method is also async and so it enters the "Exception" completion state that tasks enter. The problem comes in the situation where the task that calls DequeueAsync is resumed after the await on the AMQP Connection thread that the RabbitMQ client creates. Normally I have seen tasks resume onto the main thread or one of the worker threads floating around. However, when it resumes onto the AMQP thread and an exception is thrown, everything stalls. The task does not enter its "Exception state" and the AMQP Connection thread is left saying that it is executing the method that had the exception occur.

我这里主要的困惑是,为什么这不起作用:

My main confusion here is why this doesn't work:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards

ConsumerTaskState state = new ConsumerTaskState()
{
    Connection = connection,
    CancellationToken = cancellationToken
};

//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);

下面是 RunAsync 方法,建立了测试:

Here is the RunAsync method, set up for the test:

public async Task RunAsync()
{
    using (var channel = this.Connection.CreateModel())
    {
        ...
        AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
        var result = consumer.DequeueAsync(this.CancellationToken);

        //wait until we find something to eat
        await result;

        throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
        ...
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}

阅读我写的东西,我知道我可能没有解释我的问题非常好。如果需要澄清,只是要求。

Reading what I have written, I see that I may not have explained my problem very well. If clarification is needed, just ask.

我的解决方案,我想出如下:

My solutions that I have come up with are as follows:


  • 删除所有异步/等待code和只使用直线上升线程和块。性能会下降,但至少它不会有时候停

  • 不知何故被用于恢复任务免​​除AMQP线程。我认为,他们正在睡觉或东西,那么默认的TaskScheduler 决定使用它们。如果我能找到一种方法来告诉任务调度程序,这些线程是关闭的限制,这将是巨大的。

  • Remove all Async/Await code and just use straight up threads and block. Performance will be decreased, but at least it won't stall sometimes
  • Somehow exempt the AMQP threads from being used for resuming tasks. I assume that they were sleeping or something and then the default TaskScheduler decided to use them. If I could find a way to tell the task scheduler that those threads are off limits, that would be great.

有没有人有,为什么发生这种情况或作出解释,以解决这个有什么建议?现在我去掉异步code只是让该程序是可靠的,但我真的想了解什么是在这里发生了。

Does anyone have an explanation for why this is happening or any suggestions to solving this? Right now I am removing the async code just so that the program is reliable, but I really want to understand what is going on here.

推荐答案

我首先建议您阅读我的 异步介绍,这解释了precise条款如何等待将捕获的上下文并用它来恢复执行。总之,这将捕捉当前的的SynchronizationContext (或电流的TaskScheduler 如果 SynchronizationContext.Current )。

I first recommend that you read my async intro, which explains in precise terms how await will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext (or the current TaskScheduler if SynchronizationContext.Current is null).

另外一个重要的细节是,异步延续计划与 TaskContinuationOptions.ExecuteSynchronously (如@svick中指出,注释)。我有一个博客帖子这个但AFAIK它不正式记录任何地方。这个细节的确实的使编写的异步生产者/消费者队列困难。

The other important detail is that async continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously (as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async producer/consumer queue difficult.

究其原因等待不是切换回原来的语境是(可能),因为RabbitMQ的线程没有的SynchronizationContext 的TaskScheduler - 这样,延续直接当你调用执行 TrySetResult ,因为那些线程看起来就像普通的线程池中的线程。

The reason await isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext or TaskScheduler - thus, the continuation is executed directly when you call TrySetResult because those threads look just like regular thread pool threads.

BTW,通过你的code读书,我怀疑你的读取/写入器锁的使用和并发队列是不正确的。没有看到整个code我不能肯定,但是这是我的IM pression。

BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.

我强烈建议你使用现有的异步队列,并围绕该消费者(换句话说,让别人做最困难的部分:)。在 BufferBlock&LT; T&GT; 键入的 TPL数据流可以作为一个异步队列行事;这将是我的第一个建议,如果你有你的数据流的平台上可用。否则,我有一个<一个href=\"http://nitoasyncex.$c$cplex.com/wikipage?title=AsyncProducerConsumerQueue&referringTitle=Documentation\"相对=nofollow> AsyncProducerConsumerQueue 键入我的AsyncEx库,或者你可以的编写自己的(因为我形容我的博客)。

I strongly recommend you use an existing async queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T> type in TPL Dataflow can act as an async queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue type in my AsyncEx library, or you could write your own (as I describe on my blog).

下面是一个使用一个例子 BufferBlock&LT; T&GT;

Here's an example using BufferBlock<T>:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

在这个例子中,我让你的 DequeueAsync API。但是,一旦你开始使用TPL数据流,可以考虑使用它在其他地方也是如此。当你需要像这样的队列,这是经常会发现你的code的其他部分也将从一个数据流的方式获益。例如,而不是有一堆调用 DequeueAsync 方法,你可以链接你的 BufferBlock ActionBlock

In this example, I'm keeping your DequeueAsync API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync, you could link your BufferBlock to an ActionBlock.

这篇关于从某些线程运行preventing任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆