反应性扩展滑动时间窗口 [英] reactive extensions sliding time window

查看:42
本文介绍了反应性扩展滑动时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要输入一系列的股票报价,我想在最后一小时获取所有数据并对其进行一些处理.我正在尝试通过反应性扩展2.0实现这一目标.我在另一篇文章中阅读了使用Interval的内容,但我认为这已被弃用.

I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.

推荐答案

此扩展方法能否解决您的问题?

Would this extension method solve your problem?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}

这篇关于反应性扩展滑动时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆