将无限异步回调序列转换为Observable序列? [英] Convert infinite async callback sequence to Observable sequence?

查看:113
本文介绍了将无限异步回调序列转换为Observable序列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下基于异步回调的无限序列,我会在一段时间后取消:

Let's say I have the following asynchronous callback-based "infinite" sequence, which I cancel after some time:

'use strict';

const timers = require('timers');

let cancelled = false;

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

function cancellableSequence(callback) {
  asyncOperation((error, processTime) => {
    console.log('Did stuff');
    if (!cancelled) {
      process.nextTick(() => { cancellableSequence(callback); });
    } else {
      callback(null, processTime);
    }
  });
}

cancellableSequence((error, lastProcessTime) => {
  console.log('Cancelled');
});

timers.setTimeout(() => { cancelled = true; }, 0);

asyncOperation 将执行并回拨于至少一次,取消消息不会立即显示,而是在 asyncOperation 完成后显示。对 asyncOperation 的调用次数取决于内部 delayMsec 值以及传递给的延迟参数最后的setTimeout()(尝试显示这些是可变的)。

The asyncOperation will execute and call back at least once, and the cancellation message will not display immediately, but rather after asyncOperation is complete. The number of calls to asyncOperation depends on the internal delayMsec value and the delay argument passed to setTimeout() at the end (an attempt to show that these are variable).

我开始学习RxJS5,并认为它也许有可能把它转换成一个Observable序列(oooh,一个Observable订阅可以取消订阅()d - 看起来整洁!)。

I'm starting to learn RxJS5, and thought it might be possible to convert this into an Observable sequence ("oooh, an Observable subscription can be unsubscribe()d - that looks neat!").

然而,我的尝试将 cancellableSequence 转换为ES6生成器(如何制作无限?)产生 Observable.bindNodeCallback(asyncOperation)()导致立即收益率,在我的情况下是不受欢迎的行为。

However, my attempts at turning cancellableSequence into an ES6 generator (how else to make infinite?) yielding Observable.bindNodeCallback(asyncOperation)() resulted in immediate yields, which in my case is undesired behavior.

我不能使用 Observable.delay() Observable.timer(),因为我没有已知的一致间隔。 ( asyncOperation 中的Math.random(...)试图表明我作为调用者不控制时间,并且回调发生在某个未知的时间之后 。)

I cannot use Observable.delay() or Observable.timer(), as I do not have a known, consistent interval. (The Math.random(...) in asyncOperation was an attempt to indicate that I as the caller do not control the timing, and the callback happens "some unknown time later.")

我失败的尝试:

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
  while (true) {
    console.log('Yielding...');
    yield operationAsObservable();
  }
}

Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
  x => console.log(`Process took: ${x}msec`),
  e => console.log(`Error: ${e}`),
  c => console.log('Complete')
)

结果是输出结果:

Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete

收益率立即发生。 进程采取:xxx 输出发生在您预期时(分别在2240和2698ms之后)。

The yields occur right away. The Process took: xxx output occurs when you'd expect (after 2240 and 2698ms, respectively).

(平心而论,我关心收益率之间延迟的原因是 asyncOperation()这里实际上是一个限速令牌桶库,它控制异步回调的速率 - 我希望保留的实现。)

(In all fairness, the reason I care about the delay in between yields is that asyncOperation() here is in reality a rate-limiting token bucket library which controls the rate of asynchronous callbacks - an implementation which I'd like to retain.)

另外,我试图替换 take(2)延迟取消,但从未发生过:

As an aside, I attempted to replace take(2) with a delayed cancellation, but that never occurred:

const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
  x => console.log(`Process took: ${x}msec`),
  e => console.log(`Error: ${e}`),
  c => console.log('Complete')
)

console.log('Never gets here?');
timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.unsubscribe();
}, 0);

我可以尝试通过RxJS进行可取消订阅吗? (我可以看到其他方法,比如 process.exec('node',...)来运行 asyncOperation()作为一个单独的过程,让我能够 process.kill(..)等等,但是我们不要去那里......)。

Can what I'm attempting be accomplished with a cancellable subscription via RxJS? (I can see other approaches, such as process.exec('node', ...) to run asyncOperation() as a separate process, giving me the ability to process.kill(..), etc., but let's not go there...).

我的基于回调的初始实现是否是实现可取消序列的建议方法?

Is my initial callback-based implementation the suggested way to implement a cancellable sequence?

更新的解决方案:

请参阅我的回复评论@ user3743222的答案如下。这是我最终得到的结果(用 Observable.expand()替换ES6生成器):

See my reply comment to @user3743222's answer below. Here's what I ended up with (replace ES6 generator with Observable.expand()):

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const subscription = Rx.Observable
  .defer(operationAsObservable)
  .expand(x => operationAsObservable())
  .subscribe(
    x => console.log(`Process took: ${x}msec`),
    e => console.log(`Error: ${e}`),
    c => console.log('Complete')
  );

subscription.add(() => {
  console.log('Cancelled');
});

timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.unsubscribe();
}, 0);

更新的解决方案2:

这是什么我提出了备用RxJS4 repeatWhen()方法:

Here's what I came up with for the alternate RxJS4 repeatWhen() approach:

'use strict';

const timers = require('timers');
const Rx = require('rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation);

const subscription = Rx.Observable
  .defer(operationAsObservable)
  .repeatWhen(x => x.takeWhile(y => true))
  .subscribe(
    x => console.log(`Process took: ${x}msec`),
    e => console.log(`Error: ${e}`),
    c => console.log('Complete')
  );

timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.dispose();
}, 10000);


推荐答案

每次完成时你似乎都在重复一个动作。这似乎是一个很好的用例 expand repeatWhen

You seem to be repeating an action every time it finishes. That looks like a good use case for expand or repeatWhen.

通常情况如下:

Rx.Observable.just(false).expand(_ => {  
  return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction)
})

您可以在任何时间点将取消设置为true,当前操作完成时,它会停止循环。没有测试过,所以我很想知道最终是否有效。

You put cancelled to true at any point of time and when the current action finishes, it stops the loop. Haven't tested it so I would be interested to know if that worked in the end.

你可以查看有关民意调查的类似问题:

You can have a look at similar questions about polling:

  • How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?

文档:

  • [fromCallback]
  • [expand]

文档链接适用于Rxjs 4,但与v5相比应该没有太大的变化

Documentation links are for Rxjs 4 but there should not be much changes vs v5

这篇关于将无限异步回调序列转换为Observable序列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆