Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗? [英] Rx - How to create IObservable<T> from Task<T> such that unsubscribing cancels the task?
本文介绍了Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我是 Rx 的新手,请耐心等待.
I'm new to Rx so bear with me.
我想将 Task
包装在 IObservable
中.到目前为止一切顺利:
I want to wrap a Task<T>
in an IObservable<T>
. So far so good:
Task<T> task = Task.Factory.StartNew(...);
IObservable<T> obs = task.ToObservable();
现在,我想要的是在观察者取消订阅时通知任务取消:
Now, what I want is to signal the task to cancel when the observer unsubscribes:
var cancel = new CancellationToken();
Task<T> task = Task.Factory.StartNew(..., cancel);
IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token
//to the IObservable (?)
IDisposable disposable = obs.Subscribe(...);
Thread.Sleep(1000);
disposable.Dispose(); // this should signal the task to cancel
我该怎么做?
FWIW 是生成此切线的场景:Rx 和任务 - 生成新任务时取消正在运行的任务?
FWIW here's the scenario that generated this tangent: Rx and tasks - cancel running task when new task is spawned?
推荐答案
这是我能想到的最简单的方法,使用Observable.Create
:
Here's the simplest way I can think of, using Observable.Create
:
static IObservable<int> SomeRxWork()
{
return Observable.Create<int>(o =>
{
CancellationTokenSource cts = new CancellationTokenSource();
IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o);
return new CompositeDisposable(sub, new CancellationDisposable(cts));
});
}
static Task<int> SomeAsyncWork(CancellationToken token);
我在评论中暗示的最初方式实际上相当冗长:
The initial way I hinted at in the comments is actually rather verbose:
static IObservable<int> SomeRxWork()
{
return Observable.Create<int>(async (o, token) =>
{
try
{
o.OnNext(await SomeAsyncWork(token));
o.OnCompleted();
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
o.OnError(ex);
}
});
}
这篇关于Rx - 如何创建 IObservable<T>来自任务 T这样取消订阅会取消任务吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文