取消 RX.Net Observer 正在进行的 OnNext 方法 [英] Cancel RX.Net Observer's ongoing OnNext methods

查看:37
本文介绍了取消 RX.Net Observer 正在进行的 OnNext 方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如我的原始问题所述(参见 将相互依赖的事件流与RX.Net) 我有一个 RX.net 事件流,只要没有触发某个其他事件,它就只会调用观察者的 OnNext 方法(基本上,只要系统连接,就可以处理更改-* 事件,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件).

As described in my original question (see Correlate interdependent Event Streams with RX.Net) I have an RX.net event stream that shall only call the observer's OnNext method as long as a certain other event is not triggered (basically 'Handle Change-* Events as long as the system is connected, pause while disconnected and re-start handling of the Change-* events once the system has re-connected).

然而,虽然这对新事件很顺利,但我如何取消/向正在进行 .OnNext() 调用发出取消信号?

However, while this works smoothly with new events, how would I cancel / signal cancellation to ongoing .OnNext() calls?

推荐答案

由于您的观察者已经被编写为接受 CancellationToken,我们可以修改您的 Rx 流以提供一个与事件数据一起.我们将使用 Rx CancellationDisposable,我们将在取消订阅流时处理它.

Since your observer is already written to accept a CancellationToken, we can just modify your Rx stream to supply one along with the event data. We'll use the Rx CancellationDisposable that we will dispose of whenever the stream is unsubscribed.

// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
    return Observable.Using(
        () => new CancellationDisposable(),
        cts => source.Select(item => Tuple.Create(cts.Token, item)));
}

将其与另一个问题的解决方案放在一起:

Putting this together with the solution from the other question:

DataSourceLoaded
    .SelectMany(_ => DataSourceFieldChanged
        .Throttle(x)
        .CancelOnUnsubscribe()
        .TakeUntil(DataSourceLoaded))
    .Subscribe(c => handler(c.Item1, c.Item2));

TakeUntil 子句被触发时,它将取消订阅 CancelOnUnsubscribe observable,这将依次处理 CancellationDisposable 并导致要取消的令牌.您的观察者可以观察此令牌并在发生这种情况时停止其工作.

When the TakeUntil clause is triggered, it will unsubscribe from the CancelOnUnsubscribe observable, which will in turn dispose of the CancellationDisposable and cause the token to be cancelled. Your observer can watch this token and stop its work when this happens.

这篇关于取消 RX.Net Observer 正在进行的 OnNext 方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆