去抖动和缓冲 rxjs 订阅 [英] Debounce and buffer an rxjs subscription

查看:30
本文介绍了去抖动和缓冲 rxjs 订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个消息队列处理器,可以将消息提供给服务...

I have a message queue processor that feeds messages to a service...

q.on("message", (m) => {
  service.create(m)
    .then(() => m.ack())
    .catch(() => n.nack())
})

该服务使用 RxJS Observable 并订阅 debounceTime() 这些请求.

The service uses an RxJS Observable and subscription to debounceTime() those requests.

class Service {
  constructor() {
    this.subject = new Subject()
    this.subject.debounceTime(1000)
      .subscribe(({ req, resolve, reject }) =>
        someOtherService.doWork(req)
          .then(() => resolve())
          .catch(() => reject())
      )
  }

  create(req) {
    return new Promise((resolve, reject) =>
      this.subject.next({
        req,
        resolve,
        reject
      })
    )
  }
}

问题是只有去抖动的请求才会得到 ackd/nackd.如何确保订阅也解决/拒绝其他请求?bufferTime() 让我参与其中,但它不会在每次调用 next() 时重置超时持续时间.

The issue is that only the debounced request gets ackd/nackd. How can I ensure that the subscription also resolves/rejects the other requests? bufferTime() gets me a part of the way there, but it does not reset the timeout duration on each call to next().

推荐答案

您当前使用的 debounceTime 操作符可用于创建一个 observable,它可以通知 buffer 当前缓冲区应该何时关闭.

The debounceTime operator that you are currently using can be used to create an observable that can notify buffer of when the current buffer should be closed.

然后,buffer 将发出一个在去抖动时收到的消息数组,您可以对它们做任何你想做的事情:

Then, buffer will emit an array of the messages that were received whilst debouncing and you can do with them whatever you want:

this.subject = new Subject();
const closingNotifier = this.subject.debounceTime(1000);
this.subject.buffer(closingNotifier).subscribe(messages => {
  const last = messages.length - 1;
  messages.forEach(({ req, resolve, reject }, index) => {
    if (index === last) {
      /* whatever you are doing, now, with the debounced message */
    } else {
      /* whatever you need to do with the ignored messages */
    }
  });
});

这篇关于去抖动和缓冲 rxjs 订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆