反应性扩展滑动时间窗口 [英] reactive extensions sliding time window
本文介绍了反应性扩展滑动时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我要输入一系列的股票报价,我想在最后一小时获取所有数据并对其进行一些处理.我正在尝试通过反应性扩展2.0实现这一目标.我在另一篇文章中阅读了使用Interval的内容,但我认为这已被弃用.
I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.
推荐答案
此扩展方法能否解决您的问题?
Would this extension method solve your problem?
public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering)
{
return Observable.Create<T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp().Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, ex => o.OnError(ex), () => o.OnCompleted());
});
}
这篇关于反应性扩展滑动时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文