取消RX.Net Observer持续的OnNext方法 [英] Cancel RX.Net Observer's ongoing OnNext methods

查看:581
本文介绍了取消RX.Net Observer持续的OnNext方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如我原来的问题所述(请参阅将相互依赖的事件流与RX.Net )我有一个RX.net事件流,只要调用观察者的OnNext方法,只要某个其他事件不被触发(基本上是处理变更*事件只要系统连接,暂停,断开连接并重新开始处理Change- *事件,一旦系统重新连接)。

As described in my original question (see Correlate interdependent Event Streams with RX.Net) I have an RX.net event stream that shall only call the observer's OnNext method as long as a certain other event is not triggered (basically 'Handle Change-* Events as long as the system is connected, pause while disconnected and re-start handling of the Change-* events once the system has re-connected).

然而,虽然这可以顺利地与新的事件,我取消/信号取消到正在进行的 .OnNext()调用?

However, while this works smoothly with new events, how would I cancel / signal cancellation to ongoing .OnNext() calls?

推荐答案

由于你的观察者已经写入接受 CancellationToken ,我们可以修改您的Rx流,并提供一个事件数据。我们将使用Rx CancellationDisposable ,我们将在每次流被取消订阅时处理。

Since your observer is already written to accept a CancellationToken, we can just modify your Rx stream to supply one along with the event data. We'll use the Rx CancellationDisposable that we will dispose of whenever the stream is unsubscribed.

// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
    return Observable.Using(
        () => new CancellationDisposable(),
        cts => source.Select(item => Tuple.Create(cts.Token, item)));
}

将这与来自另一个问题的解决方案相结合:

Putting this together with the solution from the other question:

DataSourceLoaded
    .SelectMany(_ => DataSourceFieldChanged
        .Throttle(x)
        .CancelOnUnsubscribe()
        .TakeUntil(DataSourceLoaded))
    .Subscribe(c => handler(c.Item1, c.Item2));

当触发 TakeUntil 子句时,将取消订阅 CancelOnUnsubscribe observable,这将反过来处理 CancellationDisposable 并导致令牌被取消。当这种情况发生时,您的观察者可以观看此令牌并停止其工作。

When the TakeUntil clause is triggered, it will unsubscribe from the CancelOnUnsubscribe observable, which will in turn dispose of the CancellationDisposable and cause the token to be cancelled. Your observer can watch this token and stop its work when this happens.

这篇关于取消RX.Net Observer持续的OnNext方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆