可观察订阅如何优雅地终止? [英] How is an observable subscription gracefully terminated?
问题描述
我正在尝试使用 Reactive Extensions (Rx) 来处理数据流.但是,每个元素的处理可能需要一些时间.为了中断处理,我使用了 CancellationToken
,它有效地停止了订阅.
I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken
, which effectively stops the subscription.
当请求取消时,我如何优雅地完成当前工作并正确终止而不丢失任何数据?
When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
输出
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
从输出中可以看出,作业终止"这一行是不是最后一行,这意味着在应用程序终止之前清理将没有足够的时间完成.
As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
作业终止"行是要打印的最后一行.取消"和清洁"线路已经被允许慢慢来.
The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.
(添加了预期输出)
推荐答案
Observables are (a)waitable.对 observable 的订阅是不可等待的.因此,如果您想等待您的订阅代码完成,而不求助于使用 ManualResetEvent
等人工解决方案,您应该使您的订阅代码成为派生 observable 的副作用,并且 (a) 等待可观察的.您的问题中提供的示例有额外的要求,这使问题有点复杂,但没有那么多:
Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvent
s, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:
您想在订阅 observable 和等待它完成之间做其他事情(
Console.ReadLine()
等).
您想在取消 CancellationToken
时终止 observable.
You want to terminate the observable when a CancellationToken
is canceled.
以下是如何满足这些要求的示例.它仅展示了解决此问题的众多可用方法之一:
Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
var withCancellation = observable
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() => observer.OnNext(default))));
var withSideEffectsAndCancellation = withCancellation
.Do(value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
}, () => Console.WriteLine("Completed"));
var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
.Publish()
.AutoConnect(0);
Console.ReadLine();
cts.Cancel();
hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");
说明:
.TakeUntil...cts.Token.Register...
是一种从Interval
observable 立即取消订阅的惯用方法,当cts.Token
被取消.它是从 相关问题.您也可以使用更简单的.TakeWhile(x => !cts.Token.IsCancellationRequested)
,前提是您可以取消响应性稍差的取消.
The
.TakeUntil...cts.Token.Register...
is an idiomatic way to unsubscribe instantly from theInterval
observable, when thects.Token
is canceled. It is copy-pasted from a relevant question. You could also use the simpler.TakeWhile(x => !cts.Token.IsCancellationRequested)
, provided that you are OK with a slightly less responsive cancellation.
Do
运算符是执行订阅副作用的自然方式,因为它与 Subscribe
方法具有相同的参数.
The Do
operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe
method.
.Publish().AutoConnect(0);
立即使序列变热.AutoConnect
运算符无法断开与底层 observable 的连接(与 RefCount
运算符相反),但在这种特殊情况下,不需要断开连接功能.底层 observable 的生命周期已经由我们之前附加的 CancellationToken
控制.
The .Publish().AutoConnect(0);
makes the sequence hot right away. The AutoConnect
operator offers no way to disconnect from the underlying observable (as opposed to the RefCount
operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken
that we attached previously.
.Wait()
之前的 .DefaultIfEmpty()
是必需的,以防止在极端情况下发生 InvalidOperationException
在产生任何元素之前取消序列.如果您await
异步序列,它也是必需的.这些等待 observable 的机制(以及其他类似 RunAsync
和 ToTask
运算符)正在返回 observable 发出的最后一个值,当没有这样的值时,他们会感到沮丧价值存在.
The .DefaultIfEmpty()
before the .Wait()
is required in order to prevent an InvalidOperationException
in the edge case that the sequence is canceled before producing any element. It is also required if you await
asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync
and the ToTask
operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.
这篇关于可观察订阅如何优雅地终止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!