使用 observables 的旧式回调同步 [英] Old-style callbacks synchronisation using observables
问题描述
我有以下场景 - 4 个回调函数 A、B、C、D,它们被称为旧式库(其中使用一些 API 请求,因此执行时间未知/随机但结果的正确顺序(按完成的任务)时间)对我很重要) - 我想使用 rxjs 将数据同步到一个可观察的结果字符串.
I have following scenario - 4 callback functions A,B,C,D which are called form old-style like library (which use some API requests inside so execution time is unknown/random but proper order of results (by finished task time) is important for me) - and I want to synchronise data which they return to one obserwable result string using rxjs.
function getData() {
// --- BELOW Part can be EDIT ---
let obs = new ReplaySubject(1); // this is example you can use an type
function A(n) {
let r= 'A'.repeat(n);
}
function B(n) {
let r= 'B'.repeat(n);
}
function C(n) {
let r= 'C'.repeat(n);
}
function D(n) {
let r= 'D'.repeat(n);
obs.next(r);
}
// --- BELOW Part can NOT be edit ---
runLib(A,B,C,D)
return obs
}
在下面的 finalResult
片段值中是 DDDDD
这是错误的.finalResult
字符串的正确值应该是 AADDDDDCCCCBBB
.
In below snippet value of finalResult
is DDDDD
which is wrong. Proper value of finalResult
string should be AADDDDDCCCCBBB
.
// SET-UP - NOT EDIT Below code
const { of, Observable, ReplaySubject } = rxjs;
const { map, switchMap, delay } = rxjs.operators; // example
// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); }
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }
function runLib(cA,cB,cC,cD) {
libA( cA ); libB( cB ); libC( cC ); libD( cD );
}
getData().subscribe(finalResult => {
console.log(finalResult) // The result is WRONG here!
}, e=>{}, _=> console.log('finished - unsubscribed'));
function getData() {
// --- BELOW Part can be EDIT ---
let obs = new ReplaySubject(1); // this is example, you can use diffrend observale kind
function A(n) {
let r= 'A'.repeat(n);
}
function B(n) {
let r= 'B'.repeat(n);
}
function C(n) {
let r= 'C'.repeat(n);
}
function D(n) {
let r= 'D'.repeat(n);
obs.next(r);
}
// --- BELOW Part can NOT be edit ---
runLib(A,B,C,D)
return obs
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
在片段中,我标记了 getData()
内的代码,它可以在解决方案中编辑(可能看起来有点尴尬,但这正是我需要的)(你也可以在那里找到 finalResult
但不要编辑那部分代码).有可能的?怎么做?
In snippet I mark code inside getData()
which can be edit in solution (may be it looks little awkward but this is exactly what I need) (you can also find there finalResult
but not edit that part of code). It is possible? How to do it?
推荐答案
这种情况下最好的做法是将库函数包装起来返回Observable
,然后使用forkJoin
代码>等待所有结果.
The best thing to do in such case is to wrap the library functions to return Observable
and then use forkJoin
to wait for all the results.
我接受了您的代码并对其进行了修改以获得所需的结果,您需要:
I took your code and modified it to get the desired result, you would need to:
- 在每个回调中将结果发送给一个主题.
- 返回一个
Observable
等待n
发射 - 在本例中为 4 - 将排放映射到单个字符串中
- in each callback emit the result to a subject.
- return an
Observable
which wait forn
emission - in this case 4 - map the emissions into a single string
最终的
The final getData
function will look like this:
function getData() {
// --- BELOW Part can be EDIT ---
const result$: Subject<string> = new Subject<string>();
const obs = result$.asObservable().pipe(
bufferCount(4), // or any desired number of callback
map((results: string[]) => results.join(''))
);
function A(n) {
let r = "A".repeat(n);
result$.next(r);
}
function B(n) {
let r = "B".repeat(n);
result$.next(r);
}
function C(n) {
let r = "C".repeat(n);
result$.next(r);
}
function D(n) {
let r = "D".repeat(n);
result$.next(r);
}
// --- BELOW Part can NOT be edit ---
runLib(A, B, C, D);
return obs;
}
您可以在 this stackblitz 中找到完整代码,或在代码段下方运行
You can find the full code in this stackblitz or run below snippet
// SET-UP - NOT EDIT Below code
const { Subject } = rxjs;
const { take, bufferCount, map } = rxjs.operators; // example
// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); }
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }
function runLib(cA,cB,cC,cD) {
libA( cA ); libB( cB ); libC( cC ); libD( cD );
}
getData().subscribe(finalResult => {
console.log(finalResult) // The result is WRONG here!
}, e=>{},_=> console.log('finished - unsubscribed'));
function getData() {
// --- BELOW Part can be EDIT ---
const result$ = new Subject();
function A(n) {
let r = "A".repeat(n);
result$.next(r);
}
function B(n) {
let r = "B".repeat(n);
result$.next(r);
}
function C(n) {
let r = "C".repeat(n);
result$.next(r);
}
function D(n) {
let r = "D".repeat(n);
result$.next(r);
}
const obs = result$.pipe(
bufferCount(4), // or any desired number of callback
take(1),
map(results=> results.join``)
);
// --- BELOW Part can NOT be edit ---
runLib(A, B, C, D);
return obs;
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
这篇关于使用 observables 的旧式回调同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!