自定义 Rx 运算符仅在有最近值时才进行节流 [英] Custom Rx operator for throttling only when there's a been a recent value

查看:23
本文介绍了自定义 Rx 运算符仅在有最近值时才进行节流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个看起来非常有用的 Rx 运算符,但令人惊讶的是,我在 Stackoverflow 上没有发现任何精确匹配的问题.我想在 Throttle 上创建一个变体,如果有一段时间不活动,它会让值立即通过.我想象的用例是这样的:

I'm trying to create an Rx operator that seems pretty useful, but I've suprisingly not found any questions on Stackoverflow that match precisely. I'd like to create a variation on Throttle that lets values through immediately if there's been a period of inactivity. My imagined use case is something like this:

我有一个下拉列表,可以在值更改时启动 Web 请求.如果用户按住箭头键并在值之间快速循环,我不想启动对每个值的请求.但是,如果我对流进行节流,那么用户每次只需以正常方式从下拉列表中选择一个值时,就必须等待节流持续时间.

I have a dropdown that kicks off a web request when the value is changed. If the user holds down the arrow key and cycles rapidly through the values, I don't want to kick off a request for each value. But if I throttle the stream then the user has to wait out the throttle duration every time they just select a value from the dropdown in the normal manner.

因此,正常的 Throttle 看起来像这样:

So whereas a normal Throttle looks like this:

我想创建如下所示的 ThrottleSubsequent:

I want to create ThrottleSubsequent that look like this:

请注意,弹珠 1、2 和 6 会立即通过,因为它们都经过一段时间的不活动.

Note that marbles 1, 2, and 6 are passed through without delay because they each follow a period of inactivity.

我对此的尝试如下所示:

My attempt at this looks like the following:

public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
    // Create a timer that resets with each new source value
    var cooldownTimer = source
        .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer
        .Switch(); // Switch to the most recent timer

    var cooldownWindow = source.Window(() => cooldownTimer);

    // Pass along the first value of each cooldown window immediately
    var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1));

    // Throttle the rest of the values 
    var throttledRest = cooldownWindow
        .SelectMany(o => o.Skip(1))
        .Throttle(dueTime, scheduler);

    return Observable.Merge(firstAfterCooldown, throttledRest);
}

似乎有效,但我很难对此进行推理,我觉得这里有一些边缘情况,其中的事情可能会因重复值或其他原因而变得棘手.我想从更有经验的 Rx-ers 那里得到一些关于这段代码是否正确的反馈,和/或是否有更惯用的方法来做到这一点.

This seems to work, but I'm having a difficult time reasoning about this, and I get the feeling there are some edge cases here where things might get screwy with duplicate values or something. I'd like to get some feedback from more experienced Rx-ers as to whether or not this code is correct, and/or whether there is a more idiomatic way of doing this.

推荐答案

好吧,这是一个测试套件(使用 nuget Microsoft.Reactive.Testing):

Well, here's a test suite (using nuget Microsoft.Reactive.Testing):

var ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')),
    new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')),
    new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G'))
);

var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts);
var expectedResults = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G'))
);

var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();

ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

和使用

public static class TestingHelpers
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }
}

好像通过了.如果你想减少它,你可以把它变成这样:

Seems to pass. If you wanted to reduce it, you could turn it into this:

public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
    return source.Publish(_source => _source
        .Window(() => _source
            .Select(x => Observable.Interval(dueTime, scheduler))
            .Switch()
        ))
        .Publish(cooldownWindow =>
            Observable.Merge(
                cooldownWindow
                    .SelectMany(o => o.Take(1)),
                cooldownWindow
                    .SelectMany(o => o.Skip(1))
                    .Throttle(dueTime, scheduler)
            )
        );
}

<小时>

编辑:

Publish 强制共享订阅.如果您有一个糟糕的(或昂贵的)源可观察到订阅副作用,Publish 确保您只订阅一次.以下是 Publish 有帮助的示例:

Publish forces sharing of a subscription. If you have a bad (or expensive) source observable with subscription side-effects, Publish makes sure you only subscribe once. Here's an example where Publish helps:

void Main()
{
    var source = UglyRange(10);
    var target = source
        .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i)))
        .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent
        .Subscribe(i => Console.WriteLine(i));
}
static int counter = 0;
public IObservable<int> UglyRange(int limit)
{
    var uglySource = Observable.Create<int>(o =>
    {
        if (counter++ == 0)
        {
            Console.WriteLine("Ugly observable should only be created once.");
            Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i));
        }
        else
        {
            Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created.");
            o.OnError(new Exception($"observable invoked {counter} times."));
        }
        return Disposable.Empty;
    });
    return uglySource;
}

这篇关于自定义 Rx 运算符仅在有最近值时才进行节流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆