如何限制flatMap的并发性? [英] How to limit the concurrency of flatMap?

查看:120
本文介绍了如何限制flatMap的并发性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用RxJS编写一个脚本来处理数百个日志文件,每个文件大约1GB。脚本的骨架看起来像

I'm trying to use RxJS to write a script to process several hundreds of log files, each of which is about 1GB. The skeleton of the script looks like

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

代码有效,但请注意所有日志的过滤步骤文件将同时启动。但是,从文件系统IO性能的角度来看,最好一个接一个地处理一个文件(或者至少将并发限制为几个文件而不是同时打开所有数百个文件)。在这方面,我如何以功能反应方式实施?

The code works, but notice that the filtering step of all log files will start concurrently. However, from file system IO performance perspective, it is preferable to process one file after another (or at least to limit the concurrency to a few files rather than opening all hundreds of files in the same time). In this regard, how can I implement it in a "functional reactive way"?

我原本想过调度程序,但无法弄清楚它是如何帮助的。

I had thought of scheduler but could not figure out how it can help here.

推荐答案

您可以使用 .merge(maxConcurrent) 限制并发。因为 .merge(maxConcurrent)将metaobservable(可观察的observables)展平为一个observable,你需要替换 .flatMap 使用 .map 以便输出是metaobservable(unflat),然后调用 .merge(maxConcurrent)

You can use .merge(maxConcurrent) to limit the concurrency. Because .merge(maxConcurrent) flattens a metaobservable (observable of observables) into an observable, you need to replace the .flatMap with .map so that the output is a metaobservable ("unflat"), then you call .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

此代码没有已经过测试(因为我无法访问您拥有的开发环境),但这是如何进行的。 RxJS没有很多具有并发参数的运算符,但是你几乎总能用 .merge(maxConcurrent)来做你需要的。

This code hasn't been tested (since I don't have access to the development environment you have), but this is how to proceed. RxJS doesn't have many operators with concurrency parameters, but you can almost always do what you need with .merge(maxConcurrent).

这篇关于如何限制flatMap的并发性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆