如何从 Rx 订阅回调异步函数? [英] How to call back async function from Rx Subscribe?
问题描述
我想在 Rx 订阅中回调异步函数.
I would like to call back an async function within an Rx subscription.
例如像这样:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
需要做什么才能正确捕获异常?
What needs to be done, in order to catch the exception properly?
推荐答案
您不想将 async
方法传递给 Subscribe
,因为这会创建一个 async void
方法.尽量避免async void
.
You don't want to pass an async
method to Subscribe
, because that will create an async void
method. Do your best to avoid async void
.
在您的情况下,我认为您想要的是为序列的每个元素调用 async
方法,然后缓存所有结果.在这种情况下,使用 SelectMany
为每个元素调用 async
方法,并使用 Replay
进行缓存(加上一个 Connect
> 让球滚动):
In your case, I think what you want is to call the async
method for each element of the sequence and then cache all the results. In that case, use SelectMany
to call the async
method for each element, and Replay
to cache (plus a Connect
to get the ball rolling):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
我更改了从 Trigger
方法返回的 Results
属性,我认为这更有意义,因此测试现在看起来像:
I changed the Results
property to be returned from the Trigger
method instead, which I think makes more sense, so the test now looks like:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
这篇关于如何从 Rx 订阅回调异步函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!