在一段时间内不遵循不同事件的活动的活动订阅 [英] Reactive subscription to event not followed by different event within a period of time

查看:154
本文介绍了在一段时间内不遵循不同事件的活动的活动订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



为了说明这一点,我想了解如何为特定时间窗口内不同事件发生的事件创建一个反应订阅。一个用例:



两个事件触发的忙指示。忙碌不开心他们可能会一起射击,但指示灯不应太频繁地闪光/关闭。



当NotBusy触发时,应该没有指示灯。当Busy触发并且NotBusy在5秒内没有触发时,应该显示。



有没有办法完全在Reactive中完成,而不添加外部状态?

解决方案

啊,我不能抵制 - 你多久能反驳作者的源材料?



这是我的刺(LINQPad-ready):



输出如下所示:

 在1/1/0001 12:00:00 AM +00 :00,忙信号指示灯现在:OFF 
发送BUSY在1/1/0001 12:00:00 AM +00:00
发送NOTBUSY在1/1/0001 12:00:02 AM +00:00

发送BUSY在1/1/0001 12:00:03 AM +00:00
发送BUSY在1/1/0001 12:00:06 AM +00 :00
在1/1/0001 12:00:08 AM +00:00,忙信号指示灯现在是:ON
发送NOTBUSY在1/1/0001 12:00:09 AM +00 :00
在1/1/0001 12:00:09 AM +00:00,忙信号指示灯现在是:OFF

这里是我们事件的基本定义:

 枚举EventType 
{
Busy,
NotBusy
}
class StreamEvent
{
public EventType Type {get; set;}
public StreamEvent(EventType type){Type = type;}
}

这里是查询+测试代码:

  void Main()
{
//我们的模拟事件流
var fakeSource = new System.Reactive.Subjects.Subject< StreamEvent>();

//我们使用一个调度程序,我们实际上不必等待
var theTardis = new System.Reactive.Concurrency.HistoricalScheduler();

var busySignal = fakeSource
//批处理事件:
.Window(
//在忙信号上开始批处理
fakeSource.Where(e => e.Type == EventType.Busy),
//在不忙信号上停止批处理
(open)=> fakeSource.Where(e => e.Type == EventType .NotBusy)
//但是如果每个窗口超过5秒,则抛出一个超时
.Timeout(TimeSpan.FromSeconds(5),theTardis))
//打开windows
.Switch()
//捕获任何超时异常并在流中注入一个NULL
.Catch(fakeSource.StartWith((StreamEvent)null))
// Bool-ify on发生超时?
.Select(e => e == null)
//以unbusy状态开始
.StartWith(false)
//只告诉我们关于转换
.DistinctUntilChanged();

使用(busySignal.Subscribe(signal =>
Console.WriteLine(At {0},Busy Signal Indicator is now:{1},
theTardis.Now ,
signal?ON:OFF)))
{
//不应该生成忙信号
Console.WriteLine(发送BUSY在{0} ,Tardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(2));
Console.WriteLine(发送NOTBUSY在{0},theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();

//应该生成一个忙信号
Console.WriteLine(发送BUSY在{0},theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
Console.WriteLine(发送BUSY在{0},theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));

//并且这应该清除
Console.WriteLine(发送NOTBUSY在{0},theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();
}
}


I'm trying to figure out how to create a Reactive subscription for an event not followed by a different event within a specific time window.

To illustrate, here's a use case:

A busy indicator triggered by two events. Busy and NotBusy. They may fire close together but the indicator should not flash on/off too often.

When NotBusy fires, there should be no indicator. When Busy fires and NotBusy hasn't fired within 5 seconds then it should display.

Is there a way to do this entirely within Reactive without adding external state?

解决方案

Ah, I can't resist - how often do you get to counter-answer the author of the source material? (see Benjol's comment on question) :)

Here's my stab at it (LINQPad-ready):

Output looks like this:

At 1/1/0001 12:00:00 AM +00:00, Busy Signal Indicator is now:OFF
Sending BUSY at 1/1/0001 12:00:00 AM +00:00
Sending NOTBUSY at 1/1/0001 12:00:02 AM +00:00

Sending BUSY at 1/1/0001 12:00:03 AM +00:00
Sending BUSY at 1/1/0001 12:00:06 AM +00:00
At 1/1/0001 12:00:08 AM +00:00, Busy Signal Indicator is now:ON
Sending NOTBUSY at 1/1/0001 12:00:09 AM +00:00
At 1/1/0001 12:00:09 AM +00:00, Busy Signal Indicator is now:OFF

Here a basic definition of our events:

enum EventType 
{ 
    Busy, 
    NotBusy 
}
class StreamEvent 
{ 
     public EventType Type {get; set;} 
     public StreamEvent(EventType type) { Type = type;}
}

And here's the query + test code:

void Main()
{
    // our simulated event stream
    var fakeSource = new System.Reactive.Subjects.Subject<StreamEvent>();

    // Let's use a scheduler we actually don't have to wait for
    var theTardis = new System.Reactive.Concurrency.HistoricalScheduler();

    var busySignal = fakeSource
        // Batch up events:
        .Window(
            // Starting batching on a busy signal
            fakeSource.Where(e => e.Type == EventType.Busy),
            // Stop batching on a not busy signal
            (open) => fakeSource.Where(e => e.Type == EventType.NotBusy)
                // but throw a timeout if we exceed 5 seconds per window
                .Timeout(TimeSpan.FromSeconds(5), theTardis))       
        // Unpack the windows
        .Switch()
        // Catch any timeout exception and inject a NULL into the stream        
        .Catch(fakeSource.StartWith((StreamEvent)null))
        // Bool-ify on "did a timeout happen?"
        .Select(e => e == null)
        // Start in an "unbusy" state
        .StartWith(false)
        // And only tell us about transitions
        .DistinctUntilChanged();    

    using(busySignal.Subscribe(signal => 
        Console.WriteLine("At {0}, Busy Signal Indicator is now:{1}",
            theTardis.Now,
            signal ? "ON" : "OFF")))
    {
        // should not generate a busy signal
        Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
        fakeSource.OnNext(new StreamEvent(EventType.Busy));
        theTardis.AdvanceBy(TimeSpan.FromSeconds(2));
        Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
        fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
        theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
        Console.WriteLine();

        // should generate a busy signal
        Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
        fakeSource.OnNext(new StreamEvent(EventType.Busy));
        theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
        Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
        fakeSource.OnNext(new StreamEvent(EventType.Busy));
        theTardis.AdvanceBy(TimeSpan.FromSeconds(3));

        // and this should clear it
        Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
        fakeSource.OnNext(new StreamEvent(EventType.NotBusy));      
        theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
        Console.WriteLine();    
    }
}

这篇关于在一段时间内不遵循不同事件的活动的活动订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆