非阻塞的重复发生的生产者/消费者通知执行 [英] Non blocking and reoccurring producer/consumer notifier implementation

查看:153
本文介绍了非阻塞的重复发生的生产者/消费者通知执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

搜索辛苦了一块code这确实是我想要的,我很高兴。阅读和<一href="http://stackoverflow.com/questions/4238345/asynchronously-wait-for-taskt-to-complete-with-timeout">this帮助了很多东西。

我有一种情况,我需要一个单一的消费者由一个生产者得到通知时,新的数据是可用的,但也希望消费者被通知定期无论是否有新的数据可用的。 它是好的,如果将​​通知使用者以上所述重复出现的时间,但它不应该被通知较不频繁。

有可能的是,虽然消费者已经通知和工作关于'新数据多个通知发生。 (所以 SemaphoreSlim 是不是一个不错的选择)。 因此,消费者比生产者通知的速率慢,也不会排队后续的通知,他们只是重新信号相同的数据可用标志没有影响。

我也想消费者异步等待通知(不阻塞线程)。

我已经缝合在一起,下面的类,它环绕 TaskCompletionSource 并且还使用一个内部定时器。

 公共类PeriodicalNotifier:IDisposable的
{
    //需要一些虚拟的类型,因为TaskCompletionSource只有通用版
    内部结构VoidTypeStruct {}
    //始终重用这个分配
    私有静态VoidTypeStruct dummyStruct;

    私人TaskCompletionSource&LT; VoidTypeStruct&GT; internalCompletionSource;
    私人定时器reSendTimer;

    公共PeriodicalNotifier(INT autoNotifyIntervalMs)
    {
        internalCompletionSource =新TaskCompletionSource&LT; VoidTypeStruct&GT;();
        reSendTimer =新的定时器(_ =&GT;通知(),NULL,0,autoNotifyIntervalMs);
    }

    公共异步任务WaitForNotifictionAsync(的CancellationToken的CancellationToken)
    {
        使用(cancellationToken.Register(()=&GT; internalCompletionSource.TrySetCanceled()))
        {
            等待internalCompletionSource.Task;
            //重新创建 - 要能够在接下来的等待再次设置
            internalCompletionSource =新TaskCompletionSource&LT; VoidTypeStruct&GT;();
        }
    }

    公共无效的通知()
    {
        internalCompletionSource.TrySetResult(dummyStruct);
    }

    公共无效的Dispose()
    {
        reSendTimer.Dispose();
        internalCompletionSource.TrySetCanceled();
    }
}
 

这个类的用户可以做这样的事情:

 私人PeriodicalNotifier通知=新PeriodicalNotifier(100);

// ...在一些任务 - 这应该是非阻塞
而(一些条件)
{
    等待notifier.WaitForNotifictionAsync(_tokenSource.Token);
    //做一些工作?
}

// ...在某些线程,制作添加新数据
notifier.Notify();
 

效率对我很重要,该方案是一个高频率数据流的,所以我脑子里想的:

  • 等待的非阻隔性质。
  • 在我假设定时器比重建Task.Delay并取消它,如果它不是一个通知更有效。
  • TaskCompletionSource
  • 的休闲娱乐的关注

我的问题是:

  1. 请问我的code正确解决这个问题?任何隐藏的陷阱?
  2. 在我失去了一些琐碎的解决方案/现有的这个用例块?

更新:

我已经达到了除了重新实施一个更精简任务完成结构(如在的这里和<一href="http://stackoverflow.com/questions/11232604/efficient-signaling-tasks-for-tpl-completions-on-frequently-reoccuring-events?rq=1">here)我没有更多的优化,使。希望帮助任何人都希望在一个类似的方案。

解决方案
  1. 是的,你的实现是有意义的,但 TaskCompletionSource 娱乐应该是使用范围,否则老取消标记可以取消了新的 TaskCompletionSource
  2. 我想使用某种 <$ C $的C> AsyncManualResetEvent 结合定时器会更简单和更不容易出错。有一个与在 rel="nofollow"> Visual Studio的SDK非常好的空间。您需要安装SDK 然后引用< A HREF =htt​​p://msdn.microsoft.com/en-us/library/microsoft.visualstudio.threading.aspx相对=nofollow> Microsoft.VisualStudio.Threading 组装。下面是使用自己的<一实施href="http://msdn.microsoft.com/en-us/library/microsoft.visualstudio.threading.asyncmanualresetevent.aspx"相对=nofollow> AsyncManualResetEvent 具有相同的API:

 公共类PeriodicalNotifier:IDisposable的
{
    私人只读定时器_timer;
    私人只读AsyncManualResetEvent _asyncManualResetEvent;

    公共PeriodicalNotifier(时间跨​​度autoNotifyInterval)
    {
        _asyncManualResetEvent =新AsyncManualResetEvent();
        _timer =新的定时器(_ =&GT;通知(),空,TimeSpan.Zero,autoNotifyInterval);
    }

    公共异步任务WaitForNotifictionAsync(的CancellationToken的CancellationToken)
    {
        等待_asyncManualResetEvent.WaitAsync()WithCancellation(的CancellationToken)。
        _asyncManualResetEvent.Reset();
    }

    公共无效的通知()
    {
        _asyncManualResetEvent.Set();
    }

    公共无效的Dispose()
    {
        _timer.Dispose();
    }
}
 

您通过设置复位事件通知,异步等使用 WaitAsync ,使用 WithCancellation 扩展方法能够取消和然后重置该事件。多个通知通过设置相同的复位事件合并。

Searched hard for a piece of code which does what i want and i am happy with. Reading this and this helped a lot.

I have a scenario where i need a single consumer to be notified by a single producer when new data is available but would also like the consumer to be notified periodically regardless of if new data is available. It is fine if the consumer is notified more than the reoccurring period but it should not be notified less frequent.

It is possible that multiple notifications for 'new data' occur while the consumer is already notified and working. (So SemaphoreSlim was not a good fit). Hence, a consumer which is slower than the rate of producer notifications, would not queue up subsequent notifications, they would just "re-signal" that same "data available" flag without affect.

I would also like the consumer to asynchronously wait for the notifications (without blocking a thread).

I have stitched together the below class which wraps around TaskCompletionSource and also uses an internal Timer.

public class PeriodicalNotifier : IDisposable
{
    // Need some dummy type since TaskCompletionSource has only the generic version
    internal struct VoidTypeStruct { }
    // Always reuse this allocation
    private static VoidTypeStruct dummyStruct;

    private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
    private Timer reSendTimer;

    public PeriodicalNotifier(int autoNotifyIntervalMs)
    {
        internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
        {
            await internalCompletionSource.Task;
            // Recreate - to be able to set again upon the next wait
            internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        }
    }

    public void Notify()
    {
        internalCompletionSource.TrySetResult(dummyStruct);
    }

    public void Dispose()
    {
        reSendTimer.Dispose();
        internalCompletionSource.TrySetCanceled();
    }
}

Users of this class can do something like this:

private PeriodicalNotifier notifier = new PeriodicalNotifier(100);

// ... In some task - which should be non-blocking
while (some condition)
{
    await notifier.WaitForNotifictionAsync(_tokenSource.Token);
    // Do some work...
}

// ... In some thread, producer added new data
notifier.Notify();

Efficiency is important to me, the scenario is of a high frequency data stream, and so i had in mind:

  • The non-blocking nature of the wait.
  • I assume Timer is more efficient than recreating Task.Delay and cancelling it if it's not the one to notify.
  • A concern for the recreation of the TaskCompletionSource

My questions are:

  1. Does my code correctly solve the problem? Any hidden pitfalls?
  2. Am i missing some trivial solution / existing block for this use case?

Update:

I have reached a conclusion that aside from re implementing a more lean Task Completion structure (like in here and here) i have no more optimizations to make. Hope that helps anyone looking at a similar scenario.

解决方案

  1. Yes, your implementation makes sense but the TaskCompletionSource recreation should be outside the using scope, otherwise the "old" cancellation token may cancel the "new" TaskCompletionSource.
  2. I think using some kind of AsyncManualResetEvent combined with a Timer would be simpler and less error-prone. There's a very nice namespace with async tools in the Visual Studio SDK by Microsoft. You need to install the SDK and then reference the Microsoft.VisualStudio.Threading assembly. Here's an implementation using their AsyncManualResetEvent with the same API:

public class PeriodicalNotifier : IDisposable
{
    private readonly Timer _timer;
    private readonly AsyncManualResetEvent _asyncManualResetEvent;

    public PeriodicalNotifier(TimeSpan autoNotifyInterval)
    {
        _asyncManualResetEvent = new AsyncManualResetEvent();
        _timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken);
        _asyncManualResetEvent.Reset();
    }

    public void Notify()
    {
        _asyncManualResetEvent.Set();
    }

    public void Dispose()
    {
        _timer.Dispose();
    }
}

You notify by setting the reset event, asynchronously wait using WaitAsync, enable Cancellation using the WithCancellation extension method and then reset the event. Multiple notifications are "merged" by setting the same reset event.

这篇关于非阻塞的重复发生的生产者/消费者通知执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆