Monitor.Pulse/Wait的异步版本 [英] Async version of Monitor.Pulse/Wait

查看:42
本文介绍了Monitor.Pulse/Wait的异步版本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试优化与Monitor.Wait和Monitor.Pulse方法类似的异步版本(在基本功能上).这个想法是在异步方法上使用它.

I'm trying to optimize an async version of something similar (in basic funcionality) to the Monitor.Wait and Monitor.Pulse methods. The idea is to use this over an async method.

要求: 1)我有一个正在运行的任务,它负责等待有人向我的显示器发出脉冲. 2)该任务可能会计算复杂(即耗时)的操作.同时,可以不做任何事情多次调用pulse方法(因为主要任务已经在进行一些处理). 3)一旦完成主要任务,它将开始再次等待,直到另一个Pulse进入.

Requirements: 1) I have one Task running, that it is in charge of waiting until someone pulses my monitor. 2) That task may compute a complex (ie: time consuming) operation. In the meanwhile, the pulse method could be called several times without doing anything (as the main task is already doing some processing). 3) Once the main task finishes, it starts to Wait again until another Pulse comes in.

最糟糕的情况是Wait> Pulse> Wait> Pulse> Wait ...,但是通常每次等待我都会有十分之几的脉冲.

Worst case scenario is Wait>Pulse>Wait>Pulse>Wait..., but usually I have tenths/hundreds of pulses for every wait.

因此,我有以下课程(正在工作,但我认为可以根据我的要求对其进行一些优化)

So, I have the following class (working, but I think it can be optimized a bit based on my requirements)

internal sealed class Awaiter
{
    private readonly ConcurrentQueue<TaskCompletionSource<byte>> _waiting = new ConcurrentQueue<TaskCompletionSource<byte>>();

    public void Pulse()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryDequeue(out tcs))
        {
            tcs.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryPeek(out tcs))
        {
            return tcs.Task;
        }

        tcs = new TaskCompletionSource<byte>();
        _waiting.Enqueue(tcs);
        return tcs.Task;
    }
}

上述类的问题是我仅用于同步的行李.因为我将只从一个线程等待,所以实际上并不需要ConcurrentQueue,因为我始终只有一个线程.

The problem with the above class is the baggage I'm using just for synchronization. Since I will be waiting from one and only one thread, there is really no need to have a ConcurrentQueue, as I always have only one item in it.

因此,我将其简化了一点,并写了以下内容:

So, I simplified it a bit and wrote the following:

internal sealed class Awaiter2
{
    private readonly object _mutex = new object();
    private TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w == null)
            {
                return;
            }

            _waiting = null;
            w.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        var w = _waiting;
        if (w != null)
        {
            return w.Task;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w != null)
            {
                return w.Task;
            }

            w = _waiting = new TaskCompletionSource<byte>();
            return w.Task;
        }
    }
}

该新版本也可以正常工作,但是我仍然认为可以通过删除锁来对其进行更多优化.

That new version is also working ok, but I'm still thinking it can be optimized a bit more, by removing the locks.

我正在寻找有关如何优化第二个版本的建议.有什么想法吗?

I'm looking for suggestions on how I can optimize the second version. Any ideas?

推荐答案

因为您只有一个任务,可以将函数简化为

Because you only have one task ever waiting your function can be simplified to

internal sealed class Awaiter3
{
    private volatile TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }
        _waiting = null;
#if NET_46_OR_GREATER
        w.TrySetResult(1);
#else
        Task.Run(() => w.TrySetResult(1));
#endif

    }

    //This method is not thread safe and can only be called by one thread at a time.
    // To make it thread safe put a lock around the null check and the assignment,
    // you do not need to have a lock on Pulse, "volatile" takes care of that side.
    public Task Wait()
    {
        if(_waiting != null)
            throw new InvalidOperationException("Only one waiter is allowed to exist at a time!");

#if NET_46_OR_GREATER
        _waiting = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
#else
        _waiting = new TaskCompletionSource<byte>();
#endif
        return _waiting.Task;
    }
}

我确实改变了一种行为.如果使用的是.NET 4.6或更高版本,则使用#if NET_46_OR_GREATER块中的代码;如果使用的是else块,则使用代码.当您调用TrySetResult时,您可以同步运行延续,这可能导致Pulse()需要很长时间才能完成.通过在.NET 4.6中使用TaskCreationOptions.RunContinuationsAsynchronously或在Task.Run中将TrySetResult包装在4.6之前的版本中,将确保Puse()不会因继续执行任务而被阻止.

One behavior I did change. If you are using .NET 4.6 or newer use the code in the #if NET_46_OR_GREATER blocks, if under use the else blocks. When you call TrySetResult you could have the continuation synchronously run, this can cause Pulse() to take a long time to complete. By using TaskCreationOptions.RunContinuationsAsynchronously in .NET 4.6 or wrapping the TrySetResult in a Task.Run for pre 4.6 will make sure that Puse() is not blocked by the continuation of the task.

请参阅SO问题在编译时检测目标框架版本有关如何创建在您的代码中可用的NET_46_OR_GREATER定义的信息.

See the SO question Detect target framework version at compile time on how to make a NET_46_OR_GREATER definition that works in your code.

这篇关于Monitor.Pulse/Wait的异步版本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆