如何从 Rx 订阅回调异步函数? [英] How to call back async function from Rx Subscribe?

查看:31
本文介绍了如何从 Rx 订阅回调异步函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Rx 订阅中回调异步函数.

I would like to call back an async function within an Rx subscription.

例如像这样:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

需要做什么才能正确捕获异常?

What needs to be done, in order to catch the exception properly?

推荐答案

您不想将 async 方法传递给 Subscribe,因为这会创建一个 async void 方法.尽量避免async void.

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

在您的情况下,我认为您想要的是为序列的每个元素调用 async 方法,然后缓存所有结果.在这种情况下,使用 SelectMany 为每个元素调用 async 方法,并使用 Replay 进行缓存(加上一个 Connect> 让球滚动):

In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

我更改了从 Trigger 方法返回的 Results 属性,我认为这更有意义,因此测试现在看起来像:

I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}

这篇关于如何从 Rx 订阅回调异步函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆