反应式管道 - 如何控制并行性? [英] Reactive pipeline - how to control parallelism?

查看:18
本文介绍了反应式管道 - 如何控制并行性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个简单的处理管道,其中一个项目作为输入被提取,它由多个处理器以顺序方式操作,最后它被输出.下图描述了整体架构:

I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:

目前的工作方式:Pipeline 正在尽可能快地从提供者那里获取项目.一旦获取了一个项目,它就会被传递给处理器.一旦处理了一个项目,就会通知输出.虽然单个项目是按顺序处理的,但多个项目可能会并行处理(取决于从提供程序获取它们的速度).

The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).

创建并从管道返回的 IObservable 如下所示:

The IObservable created and returned from the pipeline looks like this:

return Observable.Create<T>(async observer =>
{
    while (_provider.HasNext)
    {
        T item = await _provider.GetNextAsync();
        observer.OnNext(item);
    }                
}).SelectMany(item => Observable.FromAsync(() =>
    _processors.Aggregate(
        seed: Task.FromResult(item),
        func: (current, processor) => current.ContinueWith( // Append continuations.
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

缺失的部分:我需要一种控制机制来控制在任何给定时间管道中可以有多少项(最大).

The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.

例如,如果最大并行处理数为 3,则将导致以下工作流程:

For example, if max parallel processings is 3, then that would result in the following workflow:

  1. 项目 1 被提取并传递给处理器.
  2. 项目 2 被提取并传递给处理器.
  3. 项目 3 被提取并传递给处理器.
  4. 项目 1 已完成处理.
  5. 项目 4 被提取并传递给处理器.
  6. 项目 3 已完成处理.
  7. 项目 5 被提取并传递给处理器.
  8. 等等...

推荐答案

Merge 提供了一个采用 最大并发.

它的签名看起来像:IObservable合并(这个IObservable>源,int maxConcurrency);

以下是您的示例的外观(我也重构了其他一些代码,您可以使用或离开):

Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext, 
       Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
  return _processers.Aggregate(
    seed: Observable.Return(item),
    func: (current, processor) => current.SelectMany(processor.ProcessAsync)); 
  }))
 //Only allow 3 streams to be execute in parallel.
.Merge(3);

分解它的作用,

  1. While 将检查每次迭代,如果 _provider.HasNext 为真,如果是,那么它将重新订阅以获得下一个值_provider,否则发出onCompleted
  2. 在 select 内部创建了一个新的可观察流,但尚未使用 Defer
  3. 进行评估
  4. 返回的 IObservable> 被传递给 Merge,它同时订阅最多 3 个 observable.
  5. 内部 observable 最终在订阅时进行评估.
  1. While will check each iteration, if _provider.HasNext is true, if so then it will resubscribe to get the next value for _provider, otherwise it emits onCompleted
  2. Inside of select a new observable stream is created, but not yet evaluated by using Defer
  3. The returned IObservable<IObservable<T>> is passed to Merge which subscribes to a max of 3 observables simultaneously.
  4. The inner observable finally evaluates when it is subscribed to.

备选方案 1

如果您还需要控制并行请求的数量,则需要稍微复杂一点,因为您需要发出信号,表明您的 Observable 已准备好接受新值:

If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your Observable is ready for new values:

return Observable.Create<T>(observer => 
{
  var subject = new Subject<Unit>();
  var disposable = new CompositeDisposable(subject);

  disposable.Add(subject
    //This will complete when provider has run out of values
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_, item) => 
     {
       return _processors
        .Aggregate(
         seed: Observable.Return(item),
         func: (current, processor) => current.SelectMany(processor.ProcessAsync))
        //Could also use `Finally` here, this signals the chain
        //to start on the next item.
        .Do(dontCare => {}, () => subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(observer));

  //Queue up 3 requests for the initial kickoff
  disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

  return disposable;
});

这篇关于反应式管道 - 如何控制并行性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆