流程接收事件,在固定的或最小间隔 [英] Process Rx events at fixed or minimum intervals

查看:113
本文介绍了流程接收事件,在固定的或最小间隔的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这种情况发生的每10-1000毫秒的事件序列。我同意这一事件的来源,但要处理他们在500ms的固定(或最少)的时间间隔。 我也想处理一个事件的时候,而不是在批(如缓冲区(X> 1))。

I have a sequence of events that happen every 10-1000 ms. I subscribe to this source of events, but want to handle them at a fixed (or minimum) interval of 500ms. I also want to process ONE event at a time, not in batches (like Buffer(x > 1)).

像这样的伪code:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

试过例如:

observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);

和很多其他可能的解决方案。至今没有运气。

and a lot of other potential solutions. No luck so far.

任何想法?

推荐答案

我回答过这个问题放在这里

再生(在链路腐!)用加入了presenting作为扩展方法:

Reproducing (in case of link rot!) with the addition of presenting as an extension method:

有时候,你想限制在该事件从接收数据流到达率。

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

油门操作员坐席preSS的事件,如果在指定时间内到达另一个。这是在许多情况下非常有用,但它确实有两个重要的副作用 - 甚至是UNSUP pressed活动将由区间被延迟,且事件将被完全放弃,如果他们也快速到达

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

我碰到的情况是这两个是不能接受的。在这种特殊情况下,所期望的行为如下:该事件应在由时间跨度指定的最大速率输出,但在其他方面,尽快

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

一个解决办法是这样工作的。试想一下,我们的输入流是一堆到达火车站的人。对于我们的产量,我们希望人们离开车站以最大速率。我们通过使每个人站在一个平板铁路货车的前部和发送该卡车出了站以固定的速度设定的最大速率。因为只有一个轨道,并且所有卡车以相同的速度行进,并有相同的长度,人们将离开车站以最大速率时卡车出发回到后端。但是,如果轨道是明确的,旁边的人就能够立即离开。

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

那么,我们如何把这种比喻为接收?

So how do we translate this metaphor into Rx?

我们将使用Concat的运营商的接受能力流流,并把它们合并起来背到后端 - 就像发送铁路货车沿着轨道

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

要获得每个人相当于到铁路货车,我们将使用一个选择项目中的每个与单个OnNext事件(的人)开始,有结束的事件(人)可观察序列(铁路货车)的onComplete完全定义的时间间隔后。

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

让我们假设输入事件是一个的IObservable在变量输入。这里的code:

Lets assume the input events are an IObservable in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

作为扩展方法这将成为:

As an extension method this becomes:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
    return source.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

}

这篇关于流程接收事件,在固定的或最小间隔的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆