从链式任务可观测 [英] Observable from chained Tasks

查看:114
本文介绍了从链式任务可观测的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图创建一个可观察的,其中每个项目通过异步任务产生的。下一项目应该经由在previous项目(共递归)的结果一个异步呼叫来制造。在生成的说法,这将是这个样子 - 除了生成不支持异步(也不支持在初始状态委托

I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);


作为一个更具体的例子,通过一次读取其中100条消息偷看从ServiceBus队列中的所有邮件,实现ProduceFirst,继续和ProduceNext如下:


As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

然后调用 .SelectMany(I = I标记)的IObservable&LT; IEnumerable的&LT; BrokeredMessage&GT;&GT; 把它变成一个的IObservable&LT; BrokeredMessage&GT;

在哪里_serviceBusReceiver是一个接口的实例如下:

Where _serviceBusReceiver is an instance of an interface as follows:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

和BrokeredMessage是从的https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

推荐答案

如果你要推出自己的异步生成的功能,我会建议使用递归调度代替包装的while循环。

If you are going to roll your own async Generate function I would recommend the use of recursive scheduling instead of wrapping a while loop.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

这有几个优点。首先,你可以取消这一点,用一个简单的while循环是没有办法直接取消它,事实上,直到观察到的完成你甚至不返回的订阅功能。其次,这可以让你控制每个项目的调度/异步(这使得测试轻而易举),这也使得它成为图书馆更好的整体配合

This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library

这篇关于从链式任务可观测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆