“限速"的最佳方式是什么?消耗一个 Observable? [英] What is the best way to "rate limit" consuming of an Observable?

查看:25
本文介绍了“限速"的最佳方式是什么?消耗一个 Observable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆事件进来,我必须毫无损失地执行所有事件,但我想确保它们在适当的时间段被缓冲和消耗.有人有解决办法吗?

I have a bunch of events coming in and I have to execute ALL of them without a loss, but I want to make sure that they are buffered and consumed at the appropriate time slots. Anyone have a solution?

我在 Rx 中找不到任何可以在不丢失事件的情况下做到这一点的运算符(节流 - 丢失事件).我也考虑过 Buffered、Delay 等...找不到好的解决方案.

I can't find any operators in Rx that can do that without the loss of the events (Throttle - looses events). I've also considered Buffered, Delay, etc... Can't find a good solution.

我试图在中间放置一个计时器,但不知何故它根本不起作用:

I've tried to put a timer in the middle, but somehow it doesn't work at all:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }

推荐答案

问题不是 100% 清楚,所以我做了一些假设.

The question is not 100% clear so I'm making some presumptions.

Observable.Delay 不是您想要的,因为这会在每个事件到达时产生延迟,而不是创建用于处理的均匀时间间隔.

Observable.Delay is not what you want because that will create a delay from when each event arrives, rather than creating even time intervals for processing.

Observable.Buffer 不是您想要的,因为这会导致每个给定时间间隔内的所有事件都传递给您,而不是一次一个.

Observable.Buffer is not what you want because that will cause all events in each given interval to be passed to you, rather than one at a time.

所以我相信您正在寻找一种解决方案,它可以创建某种节拍器,该节拍器可以滴答作响,并为您提供每个滴答声的事件.这可以天真构建,使用 Observable.Interval 作为节拍器和 Zip 将其连接到您的源:

So I believe you're looking for a solution that creates some sort of metronome that ticks away, and gives you an event per tick. This can be naively constructed using Observable.Interval for the metronome and Zip for connecting it to your source:

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

这将每 5 秒触发一次(在上面的示例中),并按顺序为您提供原始项目.

This will trigger every 5 seconds (in the example above), and give you the original items in sequence.

这个解决方案的唯一问题是,如果你在(比如)10 秒内没有更多的源元素,当源元素到达时,它们将被立即发送,因为一些触发"事件就在那里等着他们.该场景的大理石图:

The only problem with this solution is that if you don't have any more source elements for (say) 10 seconds, when the source elements arrive they will be immediately sent out since some of the 'trigger' events are sitting there waiting for them. Marble diagram for that scenario:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g

这是一个非常合理的问题.这里已经有两个问题可以解决这个问题:

This is a very reasonable issue. There are two questions here already that tackle it:

Rx IObservable 缓冲以平滑突发事件

一种以均匀间隔推送缓冲事件的方法

提供的解决方案是主要的Drain 扩展方法和辅助的Buffered 扩展.我已经将它们修改得更加简单(不需要 Drain,只需使用 Concat).用法是:

The solution provided is a main Drain extension method and secondary Buffered extension. I've modified these to be far simpler (no need for Drain, just use Concat). Usage is:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

扩展方法StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

这篇关于“限速"的最佳方式是什么?消耗一个 Observable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆