如何使用 Reactive Extensions 使用最大窗口大小来限制事件? [英] How can I use Reactive Extensions to throttle Events using a max window size?

查看:22
本文介绍了如何使用 Reactive Extensions 使用最大窗口大小来限制事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:

我正在构建一个 UI 应用程序,它每隔几毫秒从后端服务获取一次通知.收到新通知后,我想尽快更新 UI.

I am building a UI application that gets notifcations from a backend service every few milliseconds. Once I get a new notification i want to update the UI as soon as possible.

因为我可以在短时间内收到大量通知,而且我总是只关心最新事件,所以我使用 Reactive Extensions 框架的 Throttle() 方法.这让我可以忽略紧跟在新通知之后的通知事件,从而让我的 UI 保持响应.

As I can get lots of notifications within a short amount of time, and as I always only care about the latest event, I use the Throttle() method of the Reactive Extensions framework. This allows me to ignore notification events that are immediately followed by a new notification and so my UI stays responsive.

问题:

假设我将通知事件的事件流限制为 50 毫秒,后端每 10 毫秒发送一次通知,Thottle() 方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口.在这里,我需要一些额外的行为来指定诸如超时之类的东西,以便在事件吞吐量如此高的情况下,我可以每秒至少检索一个事件.我如何使用 Reactive Extensions 做到这一点?

Say I throttle the event stream of notification events to 50ms and the backend sends a notification every 10ms, the Thottle() method will never return an event as it keeps resetting its Sliding Window again and again. Here i need some additional behaviour to specify something like a timeout, so that i can retrieve atleast one event per second or so in case of such a high throughput of events. How can i do this with Reactive Extensions?

推荐答案

正如 James 所说,Observable.Sample 会给你产生的最新值.但是,它将在计时器上执行此操作,而不是根据油门中的第一个事件发生的时间.然而,更重要的是,如果您的采样时间很长(比如 10 秒),并且您的事件在采样后立即触发,那么您将在近 10 秒内不会收到该新事件.

As James stated, Observable.Sample will give you the latest value yielded. However, it will do so on a timer, and not in accordance to when the first event in the throttle occurred. More importantly, however, is that if your sample time is high (say ten seconds), and your event fires right after a sample is taken, you won't get that new event for almost ten seconds.

如果您需要更紧密的东西,则需要实现自己的功能.我冒昧这样做了.这段代码肯定可以进行一些清理,但我相信它可以满足您的要求.

If you need something a little tighter, you'll need to implement your own function. I've taken the liberty of doing so. This code could definitely use some clean up, but I believe it does what you've asked for.

public static class ObservableEx
{
    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
    {
        return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
    }

    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var hasValue = false;
            T value = default(T);

            var maxTimeDisposable = new SerialDisposable();
            var dueTimeDisposable = new SerialDisposable();

            Action action = () =>
            {
                if (hasValue)
                {
                    maxTimeDisposable.Disposable = Disposable.Empty;
                    dueTimeDisposable.Disposable = Disposable.Empty;
                    o.OnNext(value);
                    hasValue = false;
                }
            };

            return source.Subscribe(
                x =>
                {
                    if (!hasValue)
                    {
                        maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
                    }

                    hasValue = true;
                    value = x;
                    dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
                },
                o.OnError,
                o.OnCompleted
            );
        });
    }
}

还有一些测试...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
    [TestMethod]
    public void CanThrottle()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1)
            );
    }

    [TestMethod]
    public void CanThrottleWithMaximumInterval()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(175, 2),
            OnNext(250, 3),
            OnNext(325, 4),
            OnNext(400, 5)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(350, 4),
            OnNext(500, 5)
            );
    }

    [TestMethod]
    public void CanThrottleWithoutMaximumIntervalInterferance()
    {
        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(325, 2)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(425, 2)
            );
    }
}

这篇关于如何使用 Reactive Extensions 使用最大窗口大小来限制事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆