Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗? [英] Rx - How to create IObservable&lt;T&gt; from Task&lt;T&gt; such that unsubscribing cancels the task?

查看:19
本文介绍了Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Rx 的新手,请耐心等待.

I'm new to Rx so bear with me.

我想将 Task 包装在 IObservable 中.到目前为止一切顺利:

I want to wrap a Task<T> in an IObservable<T>. So far so good:

Task<T> task = Task.Factory.StartNew(...);
IObservable<T> obs = task.ToObservable();

现在,我想要的是在观察者取消订阅时通知任务取消:

Now, what I want is to signal the task to cancel when the observer unsubscribes:

var cancel = new CancellationToken();
Task<T> task = Task.Factory.StartNew(..., cancel);

IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token
                                          //to the IObservable (?)

IDisposable disposable = obs.Subscribe(...);
Thread.Sleep(1000);
disposable.Dispose(); // this should signal the task to cancel

我该怎么做?

FWIW 是生成此切线的场景:Rx 和任务 - 生成新任务时取消正在运行的任务?

FWIW here's the scenario that generated this tangent: Rx and tasks - cancel running task when new task is spawned?

推荐答案

这是我能想到的最简单的方法,使用Observable.Create:

Here's the simplest way I can think of, using Observable.Create:

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(o =>
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o);
        return new CompositeDisposable(sub, new CancellationDisposable(cts));
    });
}

static Task<int> SomeAsyncWork(CancellationToken token);

我在评论中暗示的最初方式实际上相当冗长:

The initial way I hinted at in the comments is actually rather verbose:

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(async (o, token) =>
    {
        try
        {
            o.OnNext(await SomeAsyncWork(token));
            o.OnCompleted();
        }
        catch (OperationCanceledException)
        {
        }
        catch (Exception ex)
        {
            o.OnError(ex);
        }
    });
}

这篇关于Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆