从链式任务可观测 [英] Observable from chained Tasks
问题描述
我试图创建一个可观察的,其中每个项目通过异步任务产生的。下一项目应该经由在previous项目(共递归)的结果一个异步呼叫来制造。在生成的说法,这将是这个样子 - 除了生成不支持异步(也不支持在初始状态委托
I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
作为一个更具体的例子,通过一次读取其中100条消息偷看从ServiceBus队列中的所有邮件,实现ProduceFirst,继续和ProduceNext如下:
As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
然后调用 .SelectMany(I = I标记)
在的IObservable&LT; IEnumerable的&LT; BrokeredMessage&GT;&GT;
把它变成一个的IObservable&LT; BrokeredMessage&GT;
在哪里_serviceBusReceiver是一个接口的实例如下:
Where _serviceBusReceiver is an instance of an interface as follows:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
和BrokeredMessage是从的https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
推荐答案
如果你要推出自己的异步生成
的功能,我会建议使用递归调度代替包装的while循环。
If you are going to roll your own async Generate
function I would recommend the use of recursive scheduling instead of wrapping a while loop.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
这有几个优点。首先,你可以取消这一点,用一个简单的while循环是没有办法直接取消它,事实上,直到观察到的完成你甚至不返回的订阅功能。其次,这可以让你控制每个项目的调度/异步(这使得测试轻而易举),这也使得它成为图书馆更好的整体配合
This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library
这篇关于从链式任务可观测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!