利用反应性扩展(RX),可以添加“暂停"消息.命令? [英] With Reactive Extensions (RX), is it possible to add a "Pause" command?

查看:90
本文介绍了利用反应性扩展(RX),可以添加“暂停"消息.命令?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个可以接收事件流并推出另一事件流的类.

I have a class which takes in a stream of events, and pushes out another stream of events.

所有事件都使用反应性扩展(RX).使用.OnNext将事件的传入流从外部源推入IObserver<T>,使用IObservable<T>.Subscribe将事件的传出流推出.我正在使用Subject<T>在后台进行管理.

All of the events use Reactive Extensions (RX). The incoming stream of events is pushed from an external source into an IObserver<T> using .OnNext, and the outgoing stream of events is pushed out using IObservable<T> and .Subscribe. I am using Subject<T> to manage this, behind the scenes.

我想知道RX中有什么技术可以暂时暂停输出.这意味着传入事件将在内部队列中累积,并且在未暂停时,事件将再次流出.

I am wondering what techniques there are in RX to pause the output temporarily. This would mean that incoming events would build up in an internal queue, and when they are unpaused, the events would flow out again.

推荐答案

这是我使用Buffer和Window运算符的解决方案:

Here is my solution using Buffer and Window operators:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

在订阅时打开窗口,并且每次从暂停流中接收到"true"时.暂停器提供假"值时,它将关闭.

Window is opened at subscription and every time 'true' is received from pauser stream. It closes when pauser provides 'false' value.

Buffer会执行它应该做的事情,它会缓冲来自减震器的介于'false'和'true'之间的值.一旦Buffer接收到"true",它将输出IList的值,这些值立即立即全部流化.

Buffer does what it supposed to do, buffers values that are between 'false' and 'true' from pauser. Once Buffer receives 'true' it outputs IList of values that are instantly streamed all at once.

DotNetFiddle链接: https://dotnetfiddle.net/vGU5dJ

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ

这篇关于利用反应性扩展(RX),可以添加“暂停"消息.命令?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆