如何等待在C#中一个单一的事件,具有超时和取消 [英] How to wait for a single event in C#, with timeout and cancellation

查看:1751
本文介绍了如何等待在C#中一个单一的事件,具有超时和取消的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以,我的要求是有我的功能等待第一个实例的事件动作< T> 从另一个类,而另一个线程来了,处理它在我的线程,让等待客户按超时中断或的CancellationToken

我想创建一个通用的功能,我可以重复使用。我设法创建办(我认为)我需要一对夫妇的选择,但双方似乎更复杂比我想像它应该必须的。

用法

只是要清楚,样品使用这个功能应该是这样的,其中 serialDevice 是吐在一个单独的线程事件:

  VAR eventOccurred = Helper.WaitForSingleEvent< StatusPacket>(
    的CancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket)
    A => serialDevice.StatusPacketReceived + = A,
    A => serialDevice.StatusPacketReceived - = A,
    5000,
    ()=> serialDevice.RequestStatusPacket());

选项1-ManualResetEventSlim

这个选项不坏,但 ManualResetEventSlim 是混乱的的Dispose 的处理比现在看来似乎应该。它给ReSharper的适合我正在访问结束之内修改/处置的东西,它是真正的难走,所以我甚至不能确定它是正确的。也许有我丢失的东西,可以打扫一下,这将是我的preference,但我没有看到它副手。这里的code。

 公共静态布尔WaitForSingleEvent< TEvent>(此的CancellationToken令牌,动作< TEvent>处理程序,动作<作用< TEvent>>认购,动作<作用< TEvent>>退订, INT msTimeout,行动初始= NULL)
{
    VAR eventOccurred = FALSE;
    VAR eventResult =默认(TEvent);
    变种O =新的对象();
    VAR苗条=新ManualResetEventSlim();
    动作< TEvent>的setResult =结果=>
    {
        锁定(O)//确保我们只得到第一个事件
        {
            如果(!eventOccurred)
            {
                eventResult =结果;
                eventOccurred = TRUE;
                // ReSharper的禁用AccessToModifiedClosure
                // ReSharper的禁用AccessToDisposedClosure
                如果(苗条!= NULL)
                {
                    slim.Set();
                }
                // ReSharper的恢复AccessToDisposedClosure
                // ReSharper的恢复AccessToModifiedClosure
            }
        }
    };
    订阅(的setResult);
    尝试
    {
        如果(初始化!= NULL)
        {
            初始化();
        }
        slim.Wait(msTimeout,令牌);
    }
    终于//确保退订异常的情况下,
    {
        退订(的setResult);
        锁定(O)//确保我们不会访问渺茫
        {
            slim.Dispose();
            纤薄= NULL;
        }
    }
    锁定(O)//确保我们的变量并不在事物中间得到改变
    {
        如果(eventOccurred)
        {
            处理器(eventResult);
        }
        返回eventOccurred;
    }
}

选项2轮询没有的WaitHandle

WaitForSingleEvent 此功能是干净多了。我能使用 ConcurrentQueue ,因此甚至不需要锁。但我只是不喜欢轮询功能睡眠,我看不出它周围的任何方式使用这种方法。我想在一个的WaitHandle ,而不是通过 Func键<布尔> 清理睡眠,但第二个我这样做,我已经得到了整个的Dispose 烂摊子重新收拾。

 公共静态布尔WaitForSingleEvent< TEvent>(此的CancellationToken令牌,动作< TEvent>处理程序,动作<作用< TEvent>>认购,动作<作用< TEvent>>退订, INT msTimeout,行动初始= NULL)
{
    变种Q =新ConcurrentQueue< TEvent>();
    订阅(q.Enqueue);
    尝试
    {
        如果(初始化!= NULL)
        {
            初始化();
        }
        token.Sleep(msTimeout,()=>!q.IsEmpty);
    }
    终于//确保退订异常的情况下,
    {
        退订(q.Enqueue);
    }
    TEvent eventResult;
    VAR eventOccurred = q.TryDequeue(出eventResult);
    如果(eventOccurred)
    {
        处理器(eventResult);
    }
    返回eventOccurred;
}公共静态无效的睡眠(这个的CancellationToken道理,诠释毫秒,Func键<布尔> exitCondition)
{
    VAR开始= DateTime.Now;
    而((DateTime.Now - 启动).TotalMilliseconds<!MS&放大器;&安培; exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.sleep代码(1);
    }
}

问题

我并不特别关心这两种解决方案,我也不是100%肯定不是他们都是100%正确。是这些解决方案比其他(惯用表达,效率等)更好的一方,还是有更简单的方法或内置的功能,以满足我需要在这里做什么?

更新:最好的答案至今

TaskCompletionSource 以下解决方案的修改。无需封长,锁或任何东西。看来pretty简单。这里的任何错误?

 公共静态布尔WaitForSingleEvent< TEvent>(此的CancellationToken令牌,动作< TEvent>的onEvent,动作<作用< TEvent>>认购,动作<作用< TEvent>>退订, INT msTimeout,行动初始= NULL)
{
    VAR TCS =新TaskCompletionSource< TEvent>();
    动作< TEvent>处理结果= = GT; tcs.TrySetResult(结果);
    VAR任务= tcs.Task;
    订阅(处理);
    尝试
    {
        如果(初始化!= NULL)
        {
            初始化();
        }
        task.Wait(msTimeout,令牌);
    }
    最后
    {
        退订(处理);
        //不要处理任务http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    如果(task.Status == TaskStatus.RanToCompletion)
    {
        的onEvent(task.Result);
        返回true;
    }
    返回false;
}

更新2:另一个伟大的解决方案

原来, BlockingCollection 作品就像 ConcurrentQueue ,但也有接受超时而取消标记方法。这个解决方案的一个好处是,它可以被更新,以使 WaitForNEvents 相当容易:

 公共静态布尔WaitForSingleEvent< TEvent>(此的CancellationToken令牌,动作< TEvent>处理程序,动作<作用< TEvent>>认购,动作<作用< TEvent>>退订, INT msTimeout,行动初始= NULL)
{
    变种Q =新BlockingCollection< TEvent>();
    动作< TEvent>添加=项目=> q.TryAdd(项目);
    订阅(添加);
    尝试
    {
        如果(初始化!= NULL)
        {
            初始化();
        }
        TEvent eventResult;
        如果(q.TryTake(出eventResult,msTimeout,令牌))
        {
            处理器(eventResult);
            返回true;
        }
        返回false;
    }
    最后
    {
        退订(添加);
        q.Dispose();
    }
}


解决方案

您可以使用接收到事件转换为可观察到的,然后来个任务,终于等到与你的令牌/暂停该任务。

一个优势这已超过任何现有的解决方案,是它调用取消该事件的线程的保证的,你的处理赢得了 T为调用两次。 (在你的第一个解决方案,您围绕此 tcs.TrySetResult 而不是 tcs.SetResult ,但总是很高兴摆脱TryDoSomething的,只是确保DoSomething的工作总是)。

另一个优势是code的简单性。它本质上是一行。所以,你甚至都不需要特别的独立功能。您可以内联,以便它更清晰准确的code做什么,你可以在主题的变化,而不需要一吨的可选参数(如您的选购初始化,或允许等待N个事件,或者免收超时在他们没有必要的情况下/注销)。和你同时拥有布尔的范围时返回VAL的的实际结果它的完成,如果这是作用的。

 使用System.Reactive.Linq;
使用System.Reactive.Threading.Tasks;
...
公共静态布尔WaitForSingleEvent< TEvent>(此的CancellationToken令牌,动作< TEvent>的onEvent,动作<作用< TEvent>>认购,动作<作用< TEvent>>退订,诠释msTimeout,行动初始= NULL){
    VAR任务= Observable.FromEvent(订阅,退订).FirstAsync()ToTask()。
    如果(初始化!= NULL){
        初始化();
    }
    尝试{
        VAR完成= task.Wait(msTimeout,令牌);
        如果(成品)的onEvent(task.Result);
        返回完成;
    }赶上(OperationCanceledException){返回false; }
}

So my requirement is to have my function wait for the first instance an event Action<T> coming from another class and another thread, and handle it on my thread, allowing the wait to be interrupted by either timeout or CancellationToken.

I want to create a generic function I can reuse. I managed to create a couple options that do (I think) what I need, but both seem more complicated than I'd imagine it should have to be.

Usage

Just to be clear, a sample use of this function would look like this, where serialDevice is spitting out events on a separate thread:

var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
    cancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket),
    a => serialDevice.StatusPacketReceived += a,
    a => serialDevice.StatusPacketReceived -= a,
    5000,
    () => serialDevice.RequestStatusPacket());

Option 1—ManualResetEventSlim

This option isn't bad, but the Dispose handling of the ManualResetEventSlim is messier than it seems like it should be. It gives ReSharper fits that I'm accessing modified/disposed things within the closure, and it's genuinely hard to follow so I'm not even sure it's correct. Maybe there's something I'm missing that can clean this up, which would be my preference, but I don't see it offhand. Here's the code.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var eventOccurred = false;
    var eventResult = default(TEvent);
    var o = new object();
    var slim = new ManualResetEventSlim();
    Action<TEvent> setResult = result => 
    {
        lock (o) // ensures we get the first event only
        {
            if (!eventOccurred)
            {
                eventResult = result;
                eventOccurred = true;
                // ReSharper disable AccessToModifiedClosure
                // ReSharper disable AccessToDisposedClosure
                if (slim != null)
                {
                    slim.Set();
                }
                // ReSharper restore AccessToDisposedClosure
                // ReSharper restore AccessToModifiedClosure
            }
        }
    };
    subscribe(setResult);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        slim.Wait(msTimeout, token);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(setResult);
        lock(o) // ensure we don't access slim
        {
            slim.Dispose();
            slim = null;
        }
    }
    lock (o) // ensures our variables don't get changed in middle of things
    {
        if (eventOccurred)
        {
            handler(eventResult);
        }
        return eventOccurred;
    }
}

Option 2—polling without a WaitHandle

The WaitForSingleEvent function here is much cleaner. I'm able to use ConcurrentQueue and thus don't even need a lock. But I just don't like the polling function Sleep, and I don't see any way around it with this approach. I'd like to pass in a WaitHandle instead of a Func<bool> to clean up Sleep, but the second I do that I've got the whole Dispose mess to clean up again.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new ConcurrentQueue<TEvent>();
    subscribe(q.Enqueue);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        token.Sleep(msTimeout, () => !q.IsEmpty);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(q.Enqueue);
    }
    TEvent eventResult;
    var eventOccurred = q.TryDequeue(out eventResult);
    if (eventOccurred)
    {
        handler(eventResult);
    }
    return eventOccurred;
}

public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
    var start = DateTime.Now;
    while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.Sleep(1);
    }
}

The question

I don't particularly care for either of these solutions, nor am I 100% sure either of them are 100% correct. Is either one of these solutions better than the other (idiomaticity, efficiency, etc), or is there an easier way or built-in function to meet what I need to do here?

Update: Best answer so far

A modification of the TaskCompletionSource solution below. No long closures, locks, or anything required. Seems pretty straightforward. Any errors here?

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var tcs = new TaskCompletionSource<TEvent>();
    Action<TEvent> handler = result => tcs.TrySetResult(result);
    var task = tcs.Task;
    subscribe(handler);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        task.Wait(msTimeout, token);
    }
    finally
    {
        unsubscribe(handler);
        // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    if (task.Status == TaskStatus.RanToCompletion)
    {
        onEvent(task.Result);
        return true;
    }
    return false;
}

Update 2: Another great solution

Turns out that BlockingCollection works just like ConcurrentQueue but also has methods accepting a timeout and cancellation token. One nice thing about this solution is that it can be updated to make a WaitForNEvents fairly easily:

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new BlockingCollection<TEvent>();
    Action<TEvent> add = item => q.TryAdd(item);
    subscribe(add);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        TEvent eventResult;
        if (q.TryTake(out eventResult, msTimeout, token))
        {
            handler(eventResult);
            return true;
        }   
        return false;
    }
    finally
    {
        unsubscribe(add);
        q.Dispose();
    }
}

解决方案

You can use Rx to convert the event to an observable, then to a task, and finally wait on that task with your token/timeout.

One advantage this has over any of the existing solutions, is that it calls unsubscribe on the event's thread, ensuring that your handler won't be called twice. (In your first solution you work around this by tcs.TrySetResult instead of tcs.SetResult, but it's always nice to get rid of a "TryDoSomething" and simply ensure DoSomething always works).

Another advantage is the code's simplicity. It's essentially one line. So you don't even particularly need an independent function. You can inline it so that it's more clear what exactly your code does, and you can make variations on the theme without needing a ton of optional parameters (like your optional initializer, or allow waiting on N events, or foregoing timeouts/cancellation in instances where they're not necessary). And you'd have both the bool return val and the actual result in scope when it's finished, if that's useful at all.

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}

这篇关于如何等待在C#中一个单一的事件,具有超时和取消的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆