使用 observables 的旧式回调同步 [英] Old-style callbacks synchronisation using observables

查看:58
本文介绍了使用 observables 的旧式回调同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下场景 - 4 个回调函数 A、B、C、D,它们被称为旧式库(其中使用一些 API 请求,因此执行时间未知/随机但结果的正确顺序(按完成的任务)时间)对我很重要) - 我想使用 rxjs 将数据同步到一个可观察的结果字符串.

I have following scenario - 4 callback functions A,B,C,D which are called form old-style like library (which use some API requests inside so execution time is unknown/random but proper order of results (by finished task time) is important for me) - and I want to synchronise data which they return to one obserwable result string using rxjs.

function getData() {

  // --- BELOW Part can be EDIT ---

  let obs = new ReplaySubject(1); // this is example you can use an type

  function A(n) { 
    let r= 'A'.repeat(n);
  }

  function B(n) {
    let r= 'B'.repeat(n);
  }

  function C(n) {
    let r= 'C'.repeat(n);
  }

  function D(n) {
    let r= 'D'.repeat(n);
    obs.next(r);
  }
  
  // --- BELOW Part can NOT be edit ---

  runLib(A,B,C,D)   
  return obs
}

在下面的 finalResult 片段值中是 DDDDD 这是错误的.finalResult 字符串的正确值应该是 AADDDDDCCCCBBB.

In below snippet value of finalResult is DDDDD which is wrong. Proper value of finalResult string should be AADDDDDCCCCBBB.

// SET-UP - NOT EDIT Below code
const { of, Observable, ReplaySubject } = rxjs;
const { map, switchMap, delay } = rxjs.operators; // example

// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }

function runLib(cA,cB,cC,cD) {
    libA( cA ); libB( cB ); libC( cC ); libD( cD );
}

getData().subscribe(finalResult => {
  console.log(finalResult) // The result is WRONG here!
}, e=>{}, _=> console.log('finished - unsubscribed'));


function getData() {

  // --- BELOW Part can be EDIT ---

  let obs = new ReplaySubject(1); // this is example, you can use diffrend observale kind

  function A(n) { 
    let r= 'A'.repeat(n);
  }

  function B(n) {
    let r= 'B'.repeat(n);
  }

  function C(n) {
    let r= 'C'.repeat(n);
  }

  function D(n) {
    let r= 'D'.repeat(n);
    obs.next(r);
  }
  
  // --- BELOW Part can NOT be edit ---

  runLib(A,B,C,D)   
  return obs
}

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

在片段中,我标记了 getData() 内的代码,它可以在解决方案中编辑(可能看起来有点尴尬,但这正是我需要的)(你也可以在那里找到 finalResult 但不要编辑那部分代码).有可能的?怎么做?

In snippet I mark code inside getData() which can be edit in solution (may be it looks little awkward but this is exactly what I need) (you can also find there finalResult but not edit that part of code). It is possible? How to do it?

推荐答案

这种情况下最好的做法是将库函数包装起来返回Observable,然后使用forkJoin代码>等待所有结果.

The best thing to do in such case is to wrap the library functions to return Observable and then use forkJoin to wait for all the results.

我接受了您的代码并对其进行了修改以获得所需的结果,您需要:

I took your code and modified it to get the desired result, you would need to:

  1. 在每个回调中将结果发送给一个主题.
  2. 返回一个 Observable 等待 n 发射 - 在本例中为 4
  3. 将排放映射到单个字符串中
  1. in each callback emit the result to a subject.
  2. return an Observable which wait for n emission - in this case 4
  3. map the emissions into a single string

最终的 getData 函数将如下所示:

The final getData function will look like this:

function getData() {
  // --- BELOW Part can be EDIT ---
  const result$: Subject<string> = new Subject<string>();
  const obs = result$.asObservable().pipe(
    bufferCount(4), // or any desired number of callback
    map((results: string[]) => results.join(''))
  );

  function A(n) {
    let r = "A".repeat(n);
    result$.next(r);
  }

  function B(n) {
    let r = "B".repeat(n);
    result$.next(r);
  }

  function C(n) {
    let r = "C".repeat(n);
    result$.next(r);
  }

  function D(n) {
    let r = "D".repeat(n);
    result$.next(r);
  }

  // --- BELOW Part can NOT be edit ---

  runLib(A, B, C, D);
  return obs;
}

您可以在 this stackblitz 中找到完整代码,或在代码段下方运行

You can find the full code in this stackblitz or run below snippet

// SET-UP - NOT EDIT Below code
const { Subject } = rxjs;
const { take, bufferCount, map } = rxjs.operators; // example

// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }

function runLib(cA,cB,cC,cD) {
    libA( cA ); libB( cB ); libC( cC ); libD( cD );
}

getData().subscribe(finalResult => {
  console.log(finalResult) // The result is WRONG here!
}, e=>{},_=> console.log('finished - unsubscribed'));


function getData() {
  // --- BELOW Part can be EDIT ---
  const result$ = new Subject();

  function A(n) {
    let r = "A".repeat(n);
    result$.next(r);
  }

  function B(n) {
    let r = "B".repeat(n);
    result$.next(r);
  }

  function C(n) {
    let r = "C".repeat(n);
    result$.next(r);
  }

  function D(n) {
    let r = "D".repeat(n);
    result$.next(r);
  }

  const obs = result$.pipe(
    bufferCount(4), // or any desired number of callback
    take(1),
    map(results=> results.join``)
  );

  // --- BELOW Part can NOT be edit ---

  runLib(A, B, C, D);
  return obs;
}

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

这篇关于使用 observables 的旧式回调同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆