可观察订阅如何优雅地终止? [英] How is an observable subscription gracefully terminated?

查看:46
本文介绍了可观察订阅如何优雅地终止?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Reactive Extensions (Rx) 来处理数据流.但是,每个元素的处理可能需要一些时间.为了中断处理,我使用了 CancellationToken,它有效地停止了订阅.

I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken, which effectively stops the subscription.

当请求取消时,我如何优雅地完成当前工作并正确终止而不丢失任何数据?

When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

输出

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

从输出中可以看出,作业终止"这一行是不是最后一行,这意味着在应用程序终止之前清理将没有足够的时间完成.

As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

作业终止"行是要打印的最后一行.取消"和清洁"线路已经被允许慢慢来.

The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.

(添加了预期输出)

推荐答案

Observables are (a)waitable.对 observable 的订阅是不可等待的.因此,如果您想等待您的订阅代码完成,而不求助于使用 ManualResetEvent 等人工解决方案,您应该使您的订阅代码成为派生 observable 的副作用,并且 (a) 等待可观察的.您的问题中提供的示例有额外的要求,这使问题有点复杂,但没有那么多:

Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvents, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:

  1. 您想在订阅 observable 和等待它完成之间做其他事情(Console.ReadLine() 等).

您想在取消 CancellationToken 时终止 observable.

You want to terminate the observable when a CancellationToken is canceled.

以下是如何满足这些要求的示例.它仅展示了解决此问题的众多可用方法之一:

Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

var withCancellation = observable
    .TakeUntil(Observable.Create<Unit>(observer =>
        cts.Token.Register(() => observer.OnNext(default))));

var withSideEffectsAndCancellation = withCancellation
    .Do(value =>
    {
        Console.WriteLine(value);
        Thread.Sleep(500);

        if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("Cancellation detected on {0}.", value);
            Thread.Sleep(500);
            Console.WriteLine("Cleaning up done for {0}.", value);
        }
    }, () => Console.WriteLine("Completed"));

var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
    .Publish()
    .AutoConnect(0);

Console.ReadLine();
cts.Cancel();

hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

说明:

  1. .TakeUntil...cts.Token.Register... 是一种从 Interval observable 立即取消订阅的惯用方法,当 cts.Token 被取消.它是从 相关问题.您也可以使用更简单的 .TakeWhile(x => !cts.Token.IsCancellationRequested),前提是您可以取消响应性稍差的取消.

  1. The .TakeUntil...cts.Token.Register... is an idiomatic way to unsubscribe instantly from the Interval observable, when the cts.Token is canceled. It is copy-pasted from a relevant question. You could also use the simpler .TakeWhile(x => !cts.Token.IsCancellationRequested), provided that you are OK with a slightly less responsive cancellation.

Do 运算符是执行订阅副作用的自然方式,因为它与 Subscribe 方法具有相同的参数.

The Do operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe method.

.Publish().AutoConnect(0); 立即使序列变热.AutoConnect 运算符无法断开与底层 observable 的连接(与 RefCount 运算符相反),但在这种特殊情况下,不需要断开连接功能.底层 observable 的生命周期已经由我们之前附加的 CancellationToken 控制.

The .Publish().AutoConnect(0); makes the sequence hot right away. The AutoConnect operator offers no way to disconnect from the underlying observable (as opposed to the RefCount operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken that we attached previously.

.Wait() 之前的 .DefaultIfEmpty() 是必需的,以防止在极端情况下发生 InvalidOperationException在产生任何元素之前取消序列.如果您await 异步序列,它也是必需的.这些等待 observable 的机制(以及其他类似 RunAsyncToTask 运算符)正在返回 observable 发出的最后一个值,当没有这样的值时,他们会感到沮丧价值存在.

The .DefaultIfEmpty() before the .Wait() is required in order to prevent an InvalidOperationException in the edge case that the sequence is canceled before producing any element. It is also required if you await asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync and the ToTask operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.

这篇关于可观察订阅如何优雅地终止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆