使用RxJS的排队功能 [英] Queuing function using RxJS

查看:154
本文介绍了使用RxJS的排队功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在后端使用带有NodeJS的rxjs. 我有一个Rest API,它允许使用者运行远程纱线安装过程. install函数返回该过程的可观察值.因此,成功安装模块后,它会发出可观察且完整的值.此时,Rest API将向用户返回一个响应,表明安装成功.如果安装失败,该过程将在流中抛出一个错误,Rest API将返回另一个包含错误信息的响应.

I'm using rxjs with NodeJS in backend. I have a Rest API which allow consumers to run remote yarn installation process. The install function returns an observable of the process. So when the module is installed successfully it emits a value in the observable and complete. At this point, the Rest API will returns a response to the user to say that the installation is successful. In case that the installation fails, the process will throw an Error in the stream and the Rest API returns another response with the error information.

我的问题是:

使用者会多次并行调用该API,因此后端将进行并行安装.

The API is called multiple times in parallel by consumers, so there will be a parallel installations in the backend.

我尝试使用油门运算符创建队列,但该队列使第一个流保持活动状态.因此,如果第一个进程已完成",则返回"true",但流未完成

I tried throttle operator to create a queue but it keeps the first stream active. So if the first process is "completed", it returns "true" but the stream doesn't complete

export class MyService {
    // the function called by the REST API
    installGlobal(moduleName: string): Observable < boolean > {
        // I think, there are something to do here to make it queuing
        return this.run('yarn', ['global', 'add', moduleName]);
    }

    private run(cmd: string, args: string[]): Observable < boolean > {
        const cmd$ = fromPromise(spawn(cmd, args)).pipe(
            map(stdout => {
                this.logger.info(`Install Module Successfully`);
                this.logger.info(`stdout: ${stdout.toString()}`);
                return true;
            }),
            catchError(error => {
                const errorMessage: string = error.stderr.toString();
                return _throw(errorMessage.substr(errorMessage.indexOf(' ') + 1));
            })
        );
        return cmd$;
    }
} 

我的期望:

有多个请求,它们必须排队.因此,第一个将被处理,并且所有并行一次都必须排队.处理完第一个流之后,它必须将响应返回给API使用方(例如200个已完成),并从队列中恢复下一个流.

Either there are multiple request, they must be queued. So the first one will be treated and all parallel onces must be queued. When the first is processed, it must returns the response to the API consumers (like 200 completed) and resume the next stream from the queue.

[2019年7月1日更新]:添加示例

您可以在 stackblitz

我已经重新实现了现有的代码,并且通过向将要调用队列的服务订购多个时间来模拟我的API调用

I have reimplemented the existant code and i'm simulating my API call by subscribing multi time to the service which will call the queue

推荐答案

Rxjs中的简单queque可以如下完成

A simple queque in Rxjs can be done like below

const queque=new Subject()
// sequential processing
queue.pipe(concatMap(item=>yourObservableFunction(item)).subscribe()
// add stuff to the queue 
queque.next(item)

这篇关于使用RxJS的排队功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆