使用RxJS如何缓冲函数调用,直到解决了另一个异步函数调用 [英] Using RxJS how to buffer function calls until an other async function call has resolved

查看:43
本文介绍了使用RxJS如何缓冲函数调用,直到解决了另一个异步函数调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用RxJS缓冲函数调用,直到另一个异步函数解决了?

How can I use RxJS to buffer function calls until another async function has resolved?

这是我要完成的事情的简单示例

Here is a simple example of what I'd like to accomplish

function asyncFunc(time) {
  setTimeout(() => {
      console.log('asyncFunc has resolved');
   }, time);
}

function funcToBuffer(time) {
  setTimeout(() => {
    console.log(time);
  }, time);
}

asyncFunc(3000);

funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);

asyncFunc(8000);

funcToBuffer(6000);
funcToBuffer(7000);

此代码现在将打印:

1000
2000
asyncFunc has resolved
4000
5000
6000
7000
asyncFunc has resolved

我要打印的是

asyncFunc has resolved
1000
2000    
4000
5000
asyncFunc has resolved
6000
7000

本质上,我想要某种控制流,该控制流允许我在需要时调用funcToBuffer,但是在幕后,我希望它在asyncFunc执行并等待解决时继续执行.一旦asyncFunc解决,funcToBuffer调用将不再被缓冲并立即执行.

In essence, I want some kind of control flow that allows me to call funcToBuffer whenever I feel like, but under the hood, I want it to hold on executing whenever asyncFunc is executing and waiting to be resolved. Once asyncFunc has resolved, funcToBuffer calls should no longer be buffered and be executed right away.

我曾尝试过使用缓冲运算符,但无法达到预期的结果.

I have tried playing with the buffer operator but I wasn't able to achieve the desired outcome.

推荐答案

如果我理解的正确,您的主要目标是通过一种机制来控制一系列功能的执行,该机制可以对这些功能进行缓冲,直到发生某些事情为止,并且正是触发缓冲功能执行的原因.

If I understand it right, your main goal is to control the execution of a sequence of functions through a mechanism that buffers them until something happens, and that something is exactly what triggers the execution of the functions buffered.

如果这是正确的,则以下内容可能是解决您的问题的基础

If this is correct, the following could be the basis for a possible solution to your problem

const functions$ = new Subject<() => any>();

const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
    buffer$.next();
    setTimeout(() => {
        executeBuffer$.next();
    }, executionDelay);
}

const functionBuffer$ = functions$
.pipe(
    bufferWhen(() => buffer$),
);

zip(functionBuffer$, executeBuffer$)
.pipe(
    tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();

让我解释一下代码.

首先,我们构建 functions $ ,即我们要控制的功能的Observable.由于我们希望能够以编程方式控制此类Observable的通知,因此Observable是使用Subject构建的.换句话说,我们没有像这样执行 funcToBuffer(1000)这样的函数,而是创建了该函数(作为一个对象),并要求 functions $ Observable发出像这样的功能

First thing, we build functions$, i.e. an Observable of the functions we want to control. The Observable is built using a Subject, since we want to be able to control the notification of such Observable programmatically. In other words, rather than kicking the execution of a function like this funcToBuffer(1000), we create the function (as an object) and ask the functions$ Observable to emit the function like this

const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);

通过这种方式,我们创建了最终将要执行的功能流.

In this way we have created a stream of functions that eventually will be executed.

第二件事,我们再次使用Subjects创建另外两个Observable, buffer $ executeBuffer $ .当我们必须从到目前为止由 functions $ 发出的函数中创建一个缓冲区以及何时必须开始执行所缓冲的函数时,将使用此类Observable发出信号.

Second thing, we create 2 more Observables, buffer$ and executeBuffer$, again using Subjects. Such Observables are used to signal when we have to create a buffer out of the functions emitted so far by functions$ and when we have to start the execution of the functions buffered.

最后两个Observables在函数 setBuffer 中使用.当您调用 setBuffer 时,您基本上会说:请创建一个缓冲区,其中包含 functions $ 到目前为止已发出的所有函数,并在 executionDelay之后开始执行它们时间指定为参数.

These last 2 Observables are used in the function setBuffer. When you call setBuffer you basically say: please, create a buffer with all the functions which have been emitted so far by functions$ and start executing them after the executionDelay time specified as parameter.

缓冲部分由使用 bufferWhen 运算符创建的 functionBuffer $ Observable执行.执行部分是利用 zip 运算符实现的,该运算符使我们能够根据 executeBuffer $ Observable的发射量来设置函数的执行节奏.

The buffering part is performed by the functionBuffer$ Observable which is created using bufferWhen operator. The execution part is implemented leveraging the zip operator, that allows us to set the rhythm of execution of the functions based on the emissions of executeBuffer$ Observable.

您可以通过测试上面的代码来设置以下测试数据.

You can test the above code setting up the following test data.

let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);

这篇关于使用RxJS如何缓冲函数调用,直到解决了另一个异步函数调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆