可前进的历史流和接收实时流 [英] Advanceable historical stream and live stream in Rx

查看:199
本文介绍了可前进的历史流和接收实时流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个观察的热点,我通常实现使用普通的主题下面,让那些有兴趣的可以订阅一个生活的通知流。

I have a hot observable that I normally implement using a normal Subject underneath, so that those interested could subscribe to a live a stream of notifications.

现在我想保持这种实时数据流,同时也暴露了一直,并有连接到这些通知,以绝对时间的所有事件的历史流知道什么时候正是他们所发生的,以及允许用户对历史流推进到任意时间点重放年代之前。


    <李>我相信大多数的,这可能与的 HistoricalScheduler 及其AdvanceTo方法,但我不知道究竟如何?
  • ,是使用的Timestamped 保存需要的?

  • 事件的时代,是一个<一个HREF =htt​​p://msdn.microsoft.com/en-us/library/hh211810%28v=vs.103%29.aspx> ReplaySubject 以实时数据流缓存到史料记载则可能是需要重放使用HistoricalScheduler?

  • I believe most of this could be achieved with a HistoricalScheduler and its AdvanceTo method, but I'm not sure exactly how?
  • And is use of Timestamped to save the times of the events needed?
  • And is a ReplaySubject needed to cache the live stream into historical records which could then be played back using the HistoricalScheduler?

究竟怎样这两个数据流的相同的源中实现,或者换句话说,如何能下面将划拨给当前的需求?

How exactly can those two streams be implemented for the same source, or in other words, how can the below be appropriated to the current requirements?

[见的重播过去采用虚时间scheduling.aspx测试-RX-查询-标题]

推荐答案

什么 HistoricalScheduler 给你是控制的调度程序的虚拟时间向前运动的能力。

What the HistoricalScheduler gives you is the ability to control the forward motion of the virtual time of the scheduler.

你所不明白的是随着时间的推移随机访问。随着虚拟时间被提前,调度动作被执行,所以他们必须提前预定。计划在以往任何行动 - 即在其后面的 HistoricalScheduler.Now 值的绝对时间 - 立即执行

What you do not get is random access over time. As virtual time is advanced, scheduled actions are executed, so they must be scheduled in advance. Any action scheduled in the past - i.e. at an absolute time that is behind the HistoricalScheduler.Now value - is executed immediately.

要重播事件,需要以某种方式记录下来,然后用 HistoricalScheduler 的一个实例调度它们 - 然后提前时间

To replay events, you need to record them somehow, then schedule them using an instance of a HistoricalScheduler - and then advance time.

在提前时间,调度操作在其到期时间执行 - 当观测发送 OnXXX()为他们的用户,在立即调度的属性将具有当前虚拟时间

When you advance time, scheduled actions are executed at their due times - and when observables send OnXXX() to their subscribers, the Now property of the scheduler will have the current virtual time.

每个订户需要它自己的调度程序,以控制时间访问独立于其他用户。这实际上意味着建立每个用户可观察到的。

Each subscriber will need access to it's own scheduler in order to control time independently of other subscribers. This effectively means creating an observable per subscriber.

下面是一个简单的例子我敲了(这将在LINQPad如果你引用的NuGet包RX-主运行)。

Here is a quick example I knocked up (that would run in LINQPad if you referenced nuget package rx-main).

首先,我录制的实时流(在一个完全非生产的方式!)记录事件到一个列表。正如你提到的,使用的时间戳()能很好地捕捉时机:

First I record a live stream (in a totally non-production way!) recording events into a list. As you suggest, use of TimeStamp() works well to capture timing:

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

现在我们可以使用HistoricalScheduler狡猾使用生成安排活动相结合。注意,这种方法可以防止一吨预定的事件被排队提前 - 相反,我们只是安排一次:

Now we can use the HistoricalScheduler combined with cunning use of Generate to schedule events. Note that this approach prevents a ton of scheduled events being queued up in advance - instead we are just scheduling one at a time:

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
    log.GetEnumerator(),
    events => events.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

现在,当我们订阅,您可以看到 HistoricalScheduler 现在酒店事件的虚拟时间:

Now when we subscribe, you can see that the HistoricalScheduler's Now property has the virtual time of the event:

replay.Subscribe(
    i => Console.WriteLine("Event: {0} happened at {1}", i,
    scheduler.Now)); 



最后,我们可以开始时间表(使用开始()只是试图打所有的事件,而不是使用 AdvanceTo 移动到一个特定的时间 - 这就像做 AdvanceTo(DateTime.MaxValue);

Finally we can start the schedule (using Start() just tries to play all events, as opposed to using AdvanceTo to move to a specific time - it's like doing AdvanceTo(DateTime.MaxValue);

scheduler.Start();

对我来说,产量为:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

其结果是,你可能会结束不得不在这个工具得到的东西,以满足您的特殊用途,以创建自己的API,它让你的工作的公平位。 - 但仍然非常强大的东西。

The upshot is that you'll probably end up having to create your own API over this tool to get something to suit your particular purposes. It leaves you a fair bit of work - but is nonetheless pretty powerful stuff.

什么是好的是,现场观察到的和重播观察到确实看起来没有彼此不同 - (!),只要你记得要经常进行参数设置你的调度程序 - 这样可以有相同的查询,轻松过他们跑了,所有与工作时间查询调度的虚拟时间。

What's nice is that the live observable and the replayed observable really look no different from each other - provided you remember to always parameterise your scheduler (!) - and so can have the same queries easily run over them, with temporal queries all working with the virtual time of the scheduler.

我用这个来测试覆盖旧数据的新查询商业情景有很大的影响。

I've used this to test out new queries over old data to great effect in commercial scenarios.

它是什么并不想成为一个传输控制,如服务于一个GUI来回滚动,通过的时间。通常情况下,你在大块运行的历史,保存新查询的输出结果,然后利用这些数据的后续的显示在图形用户界面,使用户可以通过你提供一些其他机制在闲暇来回移动。

What it isn't trying to be is a transport control, such as to serve scrolling back and forth through time in a GUI. Typically you run the history in big chunks, storing the output of new queries, and then use this data for subsequent display in a GUI so users can move back and forth at leisure via some other mechanism you provide.

最后,你不需要 ReplaySubject 缓存实时流;但你需要记录事件重播的一些手段 - 这可能仅仅是写入日志观察者

Finally, you don't need ReplaySubject to cache the live stream; but you do need some means of recording events for replay - this could just be an observer that writes to a log.

这篇关于可前进的历史流和接收实时流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆