如何限制flatMap的并发? [英] How to limit the concurrency of flatMap?

查看:30
本文介绍了如何限制flatMap的并发?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件,每个文件大约 1GB.脚本的骨架看起来像

I'm trying to use RxJS to write a script to process several hundreds of log files, each of which is about 1GB. The skeleton of the script looks like

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

代码有效,但请注意所有日志文件的过滤步骤将同时开始.但是,从文件系统 IO 性能的角度来看,最好是一个接一个地处理一个文件(或者至少将并发限制为几个文件,而不是同时打开所有数百个文件).在这方面,我如何以功能反应方式"实现它?

The code works, but notice that the filtering step of all log files will start concurrently. However, from file system IO performance perspective, it is preferable to process one file after another (or at least to limit the concurrency to a few files rather than opening all hundreds of files in the same time). In this regard, how can I implement it in a "functional reactive way"?

我想过调度程序,但不知道它在这里有什么帮助.

I had thought of scheduler but could not figure out how it can help here.

推荐答案

您可以使用 .merge(maxConcurrent) 限制并发.因为 .merge(maxConcurrent) 将一个 metaobservable(observable 的 observable)展平成一个 observable,你需要用 .map 替换 .flatMap 所以输出是一个 metaobservable(unflat"),然后你调用 .merge(maxConcurrent).

You can use .merge(maxConcurrent) to limit the concurrency. Because .merge(maxConcurrent) flattens a metaobservable (observable of observables) into an observable, you need to replace the .flatMap with .map so that the output is a metaobservable ("unflat"), then you call .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

此代码尚未经过测试(因为我无权访问您拥有的开发环境),但这是继续的方法.RxJS 没有很多带有并发参数的操作符,但是你几乎总是可以用 .merge(maxConcurrent) 做你需要的事情.

This code hasn't been tested (since I don't have access to the development environment you have), but this is how to proceed. RxJS doesn't have many operators with concurrency parameters, but you can almost always do what you need with .merge(maxConcurrent).

这篇关于如何限制flatMap的并发?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆