接收的IObservable缓冲理顺事件连发 [英] Rx IObservable buffering to smooth out bursts of events

查看:140
本文介绍了接收的IObservable缓冲理顺事件连发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个产生事件爆发迅速可观察序列(即:五个事件后立刻另一种,那么长时间的延迟,然后又快速爆发事件等)。我想通过插入事件之间的短暂延迟来消除这些阵阵。试想一下,如下图为例:

I have an Observable sequence that produces events in rapid bursts (ie: five events one right after another, then a long delay, then another quick burst of events, etc.). I want to smooth out these bursts by inserting a short delay between events. Imagine the following diagram as an example:


Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

我目前的做法是通过生成 Observable.Interval节拍器般的定时器(),当它的确定信号,从原始流中提取另一个事件。问题是,我无法弄清楚如何再结合该定时器与我生无缓冲可观察序列。

My current approach is to generate a metronome-like timer via Observable.Interval() that signals when it's ok to pull another event from the raw stream. The problem is that I can't figure out how to then combine that timer with my raw unbuffered observable sequence.

IObservable.Zip()靠近做我想要的,但它仅适用,只要原料流是生产活动比定时器更快。只要有原料流中的显著平静,定时器建立了一系列不希望的事件,然后立即配对与事件从原始流中的下一个突发的

IObservable.Zip() is close to doing what I want, but it only works so long as the raw stream is producing events faster than the timer. As soon as there is a significant lull in the raw stream, the timer builds up a series of unwanted events that then immediately pair up with the next burst of events from the raw stream.

在理想情况下,我想与产生我上面列出的bevaior下列函数签名的的IObservable扩展方法。现在想来,我的救援计算器:)

Ideally, I want an IObservable extension method with the following function signature that produces the bevaior I've outlined above. Now, come to my rescue StackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS。我是全新的,以接收,所以我的道歉,如果这是一个平凡简单的问题...

PS. I'm brand new to Rx, so my apologies if this is a trivially simple question...

下面是我有不少问题最初的幼稚和简单的解决方案:

Here's my initial naive and simplistic solution that has quite a few problems:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

与该第一个明显的问题是,IDisposable的返回由内订阅到原始源丢失,因此该订阅不能被终止。调用Dispose在此方法返回了IDisposable杀死计时器,而不是底层的原始事件饲料,现在是不必要的填充队列空无一人从队列中拉出事件。

The first obvious problem with this is that the IDisposable returned by the inner subscription to the raw source is lost and therefore the subscription can't be terminated. Calling Dispose on the IDisposable returned by this method kills the timer, but not the underlying raw event feed that is now needlessly filling the queue with nobody left to pull events from the queue.

的第二个问题是没有办法为例外或结束流将通知通过从原始事件流传播到所述缓冲流 - 订阅的原始源,当他们被忽略

The second problem is that there's no way for exceptions or end-of-stream notifications to be propogated through from the raw event stream to the buffered stream - they are simply ignored when subscribing to the raw source.

和最后但并非最不重要的,现在我已经得到了code定期不管是否真的有什么工作要做,这我倒是preFER避免在这个美好的新反应世界中醒来。

And last but not least, now I've got code that wakes up periodically regardless of whether there is actually any work to do, which I'd prefer to avoid in this wonderful new reactive world.

要解决我最初的简单的方法遇到的问题,我写了的更复杂的函数,它的表现很像 IObservable.Delay() (我用.net反射读取code,并用它作为我的功能的基础)。不幸的是,很多样板逻辑,如 AnonymousObservable 是system.reactive code以外的不能公开访问,所以我不得不复制并粘贴很多 code的。该解决方案似乎工作,但鉴于其复杂性,我不太相信其无bug。

To solve the problems encountered in my initial simplistic approach, I wrote a much more complicated function that behaves much like IObservable.Delay() (I used .NET Reflector to read that code and used it as the basis of my function). Unfortunately, a lot of the boilerplate logic such as AnonymousObservable is not publicly accessible outside the system.reactive code, so I had to copy and paste a lot of code. This solution appears to work, but given its complexity, I'm less confident that its bug free.

我简直不敢相信,没有办法做到这一点使用标准无扩展的某种组合。我讨厌的感觉就像我不必要重新发明轮子,而我试图建立的格局似乎是一个相当标准的。

I just can't believe that there isn't a way to accomplish this using some combination of the standard Reactive extensions. I hate feeling like I'm needlessly reinventing the wheel, and the pattern I'm trying to build seems like a fairly standard one.

推荐答案

这实际上是<一个副本href=\"http://stackoverflow.com/questions/4123178/a-way-to-push-buffered-events-in-even-intervals/4124273#4124273\">A方式推缓存的事件,即使间隔,但我会在这里有一个总结(原来看起来很混乱,因为它着眼于几个备选方案)。

This is actually a duplicate of A way to push buffered events in even intervals, but I'll include a summary here (the original looks quite confusing because it looks at a few alternatives).

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Drain(x => 
        Observable.Empty<int>()
            .Delay(minDelay)
            .StartWith(x)
    );
}

我流失的实施就像的SelectMany ,它除了等待previous输出第一完成(你可以把它看成 ConactMany ,而的SelectMany 更像 MergeMany )。内置的排水不以这种方式工作,所以你需要包括下面的实​​现:

My implementation of Drain works like SelectMany, except it waits for the previous output to finish first (you could think of it as ConactMany, whereas SelectMany is more like MergeMany). The built-in Drain does not work this way, so you'll need to include the implementation below:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(
        this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

这篇关于接收的IObservable缓冲理顺事件连发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆