如何创建一个既是 Task<T> 的类?和IObservable ? T ? [英] How can I create a class that is both a Task<T> and an IObservable<T>?

查看:40
本文介绍了如何创建一个既是 Task<T> 的类?和IObservable ? T ?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近我遇到了一种情况,将异步操作同时表示为 TaskIObservable 将是有利的.任务表示维护操作的状态(IsCompletedIsFaulted 等),而可观察的表示允许以有趣的方式组合多个操作(Concatcode>、MergeSwitch 等),自动处理取消任何沿途已取消订阅的操作,解决这种方式的fire-and-forgotten asynchronous问题操作.所以我对结合这两种表示的方法产生了兴趣.

Recently I encountered a situation where having an asynchronous operation represented both as a Task<T> and as an IObservable<T> would be advantageous. The task representation maintains the state of the operation (IsCompleted, IsFaulted etc), while the observable representation enables the composition of multiple operations in interesting ways (Concat, Merge, Switch etc), handling automatically the cancellation of any operation that has been unsubscribed along the way, solving this way the problem of fire-and-forgotten asynchronous operations. So I've become interested about ways to combine these two representations.

组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 TaskIObservable 的类型, 并将它们公开为它的两个属性.但是在这个问题中,我对 is 一个 Taskis 的类型的挑战性和可能不切实际的可能性感兴趣IObservable 同时.一种可以直接传递给接受任务或可观察对象的 API 的类型,并在任何一种情况下都做正确的事情.所以它不能只是一个类似任务的对象.它必须从真正的东西继承,Task 类本身.像这样:

The easy, and probably correct, way to combine them would be through composition: creating a type that stores internally a Task<T> and an IObservable<T>, and exposes them as two of its properties. But in this question I am interested about the challenging, and probably impractical, possibility of a type that is a Task<T> and is an IObservable<T> at the same time. A type that can be passed directly to APIs that accept either tasks or observables, and do the right thing in either case. So it can't be just a task-like object. It must inherit from the real thing, the Task<T> class itself. Something like this:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

创建 AsyncOperation 实例应立即调用提供的操作.换句话说,AsyncOperation 应该代表一个热门任务/可观察组合.

Creating an AsyncOperation instance should invoke immediately the supplied action. In other words an AsyncOperation should represent a hot task/observable combo.

是否可以创建这样的类型?

Is it possible to create such a type?

顺便说一句,这里是 ReactiveX/RxJava 库中的一个线程,证明其他人已经考虑过这个问题之前的问题:没有isCompleted";或错误"Observable 上的方法

Btw here is a thread in the ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable

推荐答案

我找到了一种方法来创建一个继承自 Task 的 observable,通过使用 @GlennSlayden 在 这个 答案.

I found a way to create an observable that inherits from Task, by using a genius technique described by @GlennSlayden in this answer.

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

上述解决方案并不完美,因为派生类的实例永远无法过渡到 Canceled 状态.这是一个我不知道如何修复的问题,它可能无法修复,但它可能不是很重要.取消以 TaskCanceledException 的形式出现,处理这个异常是处理取消任务的正常方式.

The above solution is not perfect because an instance of the derived class can never transition to the Canceled state. This is a problem I don't know how to fix, and it may not be fixable, but it's probably not very important. A cancellation emerges as a TaskCanceledException, and handling this exception is the normal way of dealing with canceled tasks anyway.

有趣的是,可以通过创建一个虚拟订阅并处理它来取消异步操作:

Interestingly the asynchronous operation can be canceled by creating a dummy subscription and disposing it:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

我对这门课进行了一些试验,发现它不如我最初想象的那么实用.问题是存在许多支持任务和可观察对象的 API,并且在其他方​​面是相同的(例如 ConcatMergeSwitch等待等).这导致经常出现编译时错误 (CS0121 模棱两可的电话).通过将类型转换为 task 或 observable 可以解决歧义,但这很尴尬,并且首先否定了组合这两种类型的全部目的.

I experimented with this class a bit and I found that it's less practical than I initially thought it would be. The problem is that many APIs exist that support both tasks and observables, and are identical otherwise (for example Concat, Merge, Switch, Wait etc). This leads to the frequent appearance of compile-time errors (CS0121 ambiguous call). Resolving the ambiguities is possible by casting the type as either task or observable, but this is awkward, and negates the whole purpose of combining the two types in the first place.

说明:_promise.GetAwaiter().GetResult() 乍一看可能表明此实现阻塞了 ThreadPool 线程.情况并非如此,因为基础 Task 最初是冷的,只有在 _promise 完成后才会升温.

Clarification: The line _promise.GetAwaiter().GetResult() may indicate at first glance that this implementation blocks a ThreadPool thread. This is not the case because the base Task is initially cold, and it's only warmed when the _promise has completed.

这篇关于如何创建一个既是 Task&lt;T&gt; 的类?和IObservable ? T ?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆