Rx的可持续历史流和直播 [英] Advanceable historical stream and live stream in Rx

查看:176
本文介绍了Rx的可持续历史流和直播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个热点观察,我通常使用正常的主题下面实现,以便有兴趣的人可以订阅一个通知流。



现在我想保留直播,但也暴露了所有事件的历史流,并已将绝对时间附加到这些通知知道他们什么时候发生的事情如果允许订阅者将历史流推进到任何时间点,然后再播放年表。




  • 我相信大部分可以通过标题]

    解决方案

    HistoricalScheduler 给你的是能够控制调度器虚拟时间的向前运动。 p>

    你不能得到的是随着时间的随机访问。随着虚拟时间的提前,计划的动作被执行,所以必须提前安排。过去执行的任何行动 - 即在 HistoricalScheduler.Now 值之后的绝对时间 - 将立即执行。



    要重播事件,您需要以某种方式进行记录,然后使用 HistoricalScheduler 的实例安排它们,然后提前执行时间。



    当您提前时间时,计划的动作将在其到期时间执行 - 当可观察者向其订阅者发送 OnXXX()时, Now 调度器的属性将具有当前的虚拟时间。



    每个用户都需要访问自己的调度程序才能控制时间独立于其他用户。这实际上意味着每个订阅者创建一个可观察的。



    这是一个快速的例子,我敲了敲(如果你引用了nuget软件包rx-main,它将在LINQPad中运行) p>

    首先,我将录制事件录制到列表中的直播(完全非生产方式!)。如你所建议的,使用 TimeStamp()可以很好地捕捉时间:

      / *录制直播流* / 
    var source = Observable.Interval(TimeSpan.FromSeconds(1));
    var log = source.Take(5).Timestamp()。ToList()。Wait();


    Console.WriteLine(Time now is+ DateTime.Now);

    现在我们可以使用HistoricalScheduler结合狡猾的使用Generate来调度事件。请注意,这种方法可以防止大量排定的事件提前排队 - 相反,我们只是一次安排一个:

      var scheduler = new HistoricalScheduler(); 

    / *设置记录事件的调度* /
    var replay = Observable.Generate(
    log.GetEnumerator(),
    events =>事件.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

    现在我们订阅了,你可以看到 HistoricalScheduler 现在属性具有事件的虚拟时间:

      replay.Subscribe(
    i => Console.WriteLine(事件:{0}发生在{1},i,
    scheduler.Now));

    最后我们可以开始安排(使用Start()只是试图播放所有的事件,而不是使用 AdvanceTo 移动到特定的时间 - 就像做$ code> AdvanceTo(DateTime.MaxValue);

      scheduler.Start(); 

    我的输出是:

     现在时间是07/01/2014 15:17:27 
    事件:0发生在07/01/2014 15:17:23 +00:00
    事件:1发生在07/01/2014 15:17:24 +00:00
    事件:2发生在07/01/2014 15:17:25 +00:00
    事件:3发生在07/01/2014 15:17:26 +00:00
    事件:4发生在07 / 01/2014 15:17:27 +00:00

    结果是你可能会结束必须通过此工具创建自己的API,以获得适合您特定目的的东西,这给您一些相当的工作 - 但是仍然是非常强大的东西。



    什么是好的是现场观察和重播d可观察真的看起来没有什么不同 - 只要你记得总是参数化你的调度程序(!) - 所以可以有相同的查询容易地运行在它们之间,时间查询都使用调度程序的虚拟时间。



    我已经用这个来测试旧的数据的新查询,在商业场景中效果非常好。



    不想成为的是传输控制 ,例如在GUI中通过时间滚动来滚动。通常,您以大块方式运行历史记录,存储新查询的输出,然后在GUI中使用此数据进行后续显示,以便用户可以通过您提供的其他机制随时来回移动。



    最后,您不需要 ReplaySubject 来缓存直播流;但是您需要一些记录事件的方法来重播 - 这可能只是写入日志的观察者。


    I have a hot observable that I normally implement using a normal Subject underneath, so that those interested could subscribe to a live a stream of notifications.

    Now I would like to keep that live stream, but also expose a historical stream of all the events that have been AND have absolute times attached to those notifications to know when exactly they happened AS WELL AS allow the subscribers to advance the historical stream to any point in time before replaying the chronology.

    • I believe most of this could be achieved with a HistoricalScheduler and its AdvanceTo method, but I'm not sure exactly how?
    • And is use of Timestamped to save the times of the events needed?
    • And is a ReplaySubject needed to cache the live stream into historical records which could then be played back using the HistoricalScheduler?

    How exactly can those two streams be implemented for the same source, or in other words, how can the below be appropriated to the current requirements?

    [ see "Replaying the past" heading ]

    解决方案

    What the HistoricalScheduler gives you is the ability to control the forward motion of the virtual time of the scheduler.

    What you do not get is random access over time. As virtual time is advanced, scheduled actions are executed, so they must be scheduled in advance. Any action scheduled in the past - i.e. at an absolute time that is behind the HistoricalScheduler.Now value - is executed immediately.

    To replay events, you need to record them somehow, then schedule them using an instance of a HistoricalScheduler - and then advance time.

    When you advance time, scheduled actions are executed at their due times - and when observables send OnXXX() to their subscribers, the Now property of the scheduler will have the current virtual time.

    Each subscriber will need access to it's own scheduler in order to control time independently of other subscribers. This effectively means creating an observable per subscriber.

    Here is a quick example I knocked up (that would run in LINQPad if you referenced nuget package rx-main).

    First I record a live stream (in a totally non-production way!) recording events into a list. As you suggest, use of TimeStamp() works well to capture timing:

    /* record a live stream */
    var source = Observable.Interval(TimeSpan.FromSeconds(1));
    var log = source.Take(5).Timestamp().ToList().Wait();
    
    
    Console.WriteLine("Time now is " + DateTime.Now);
    

    Now we can use the HistoricalScheduler combined with cunning use of Generate to schedule events. Note that this approach prevents a ton of scheduled events being queued up in advance - instead we are just scheduling one at a time:

    var scheduler = new HistoricalScheduler();
    
    /* set up the scheduling of the recording events */
    var replay = Observable.Generate(
        log.GetEnumerator(),
        events => events.MoveNext(),
        events => events,
        events => events.Current.Value,
        events => events.Current.Timestamp,
        scheduler);
    

    Now when we subscribe, you can see that the HistoricalScheduler's Now property has the virtual time of the event:

    replay.Subscribe(
        i => Console.WriteLine("Event: {0} happened at {1}", i,
        scheduler.Now)); 
    

    Finally we can start the schedule (using Start() just tries to play all events, as opposed to using AdvanceTo to move to a specific time - it's like doing AdvanceTo(DateTime.MaxValue);

    scheduler.Start();
    

    The output for me was:

    Time now is 07/01/2014 15:17:27
    Event: 0 happened at 07/01/2014 15:17:23 +00:00
    Event: 1 happened at 07/01/2014 15:17:24 +00:00
    Event: 2 happened at 07/01/2014 15:17:25 +00:00
    Event: 3 happened at 07/01/2014 15:17:26 +00:00
    Event: 4 happened at 07/01/2014 15:17:27 +00:00
    

    The upshot is that you'll probably end up having to create your own API over this tool to get something to suit your particular purposes. It leaves you a fair bit of work - but is nonetheless pretty powerful stuff.

    What's nice is that the live observable and the replayed observable really look no different from each other - provided you remember to always parameterise your scheduler (!) - and so can have the same queries easily run over them, with temporal queries all working with the virtual time of the scheduler.

    I've used this to test out new queries over old data to great effect in commercial scenarios.

    What it isn't trying to be is a transport control, such as to serve scrolling back and forth through time in a GUI. Typically you run the history in big chunks, storing the output of new queries, and then use this data for subsequent display in a GUI so users can move back and forth at leisure via some other mechanism you provide.

    Finally, you don't need ReplaySubject to cache the live stream; but you do need some means of recording events for replay - this could just be an observer that writes to a log.

    这篇关于Rx的可持续历史流和直播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆