Rx 框架:在不中断原始可观察序列的情况下执行超时操作 [英] Rx Framework: execute an action on timeout without interrupting the original observable sequence

查看:28
本文介绍了Rx 框架:在不中断原始可观察序列的情况下执行超时操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一个可观察的源,通过轮询低级设备的(a)状态生成...

Given an observable source, generated by polling the (changes of a) state of a low-level device...

// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
    .Select(tick => new DeviceState(_device.ReadValue()))
    .DistinctUntilChanged();

... 以及更新 UI 的消费者...

... and a consumer that updates the UI...

// UI metacode:
service.GetObservableDeviceStates()
    .Subscribe(state => viewModel.CurrentState = state.ToString());

... 我需要在源不活动"的 x 秒后执行自定义操作,而不会中断对源的订阅.像这样:

... I need to execute a custom action after x seconds of source's "inactivity", without interrupting the subscription to source. Something like this:

// UI metacode:
service.GetObservableDeviceStates()
    .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
    .Subscribe(state => viewModel.CurrentState = state.ToString());

最佳做法是什么?想到的可能解决方案是(我是 Rx 菜鸟):

What are the best practices? Possible solutions that come to mind are (I'm a Rx noob):

  1. 缓冲区(即使它不那么易读)
  2. 玩弄这个超时过载;
  3. 当没有任何变化时返回一些特殊的服务端"(而不是使用 DistinctUntilChanged)并在 UI 代码上处理它:

  1. Buffer (even if it's not so readable)
  2. Playing around this Timeout overload;
  3. Returning something special "service-side" when nothing changes (instead of using DistinctUntilChanged) and dealing with it on the UI code:

service.GetObservableDeviceStates().订阅(状态=>viewModel.CurrentState = state.Special ?"空闲" : state.ToString());

service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.Special ? "Idle" : state.ToString());

据报道在答案中,解决方案是:

as reported in the answer, the solution is:

        service.GetObservableDeviceStates()
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .Subscribe(onTimeout);

EDIT2(警告)

如果 onNext 和 onTimeout 更新 UI 组件,为了避免 CrossThreadExceptions 两个需要 ObserveOn(uiSynchronizationContext),因为 Throttle 在另一个线程上工作!

If onNext and onTimeout updates UI components, to avoid CrossThreadExceptions two ObserveOn(uiSynchronizationContext) are needed, since Throttle works on another thread!

        service.GetObservableDeviceStates()
            .ObserveOn(uiSynchronizationContext)
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .ObserveOn(uiSynchronizationContext)
            .Subscribe(onTimeout);

推荐答案

Timeout 或多或少用于表示单个异步操作的 observables - 例如,返回默认值或 OnError 如果说observable 在一定时间内没有通知你.

Timeout is more or less meant for observables which represent single asynchronous operations - for e.g., to return a default value or OnError if said observable hasn't notified you in a certain amount of time.

您要查找的运算符是 节流,尽管一开始看起来可能不像.Throttle(p) 为您提供一个流,该流在源流没有产生周期 p 的值时产生一个值.

The operator you're looking for is Throttle, even though it may not seem like it at first. Throttle(p) gives you a stream which produces a value when the source stream has not produced a value for period p.

与您现有的代码并行,您可以使用 source.Throttle(period).Do(...side effect).

Parallel to your existing code, you can use source.Throttle(period).Do(...side effect).

这篇关于Rx 框架:在不中断原始可观察序列的情况下执行超时操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆