去抖动和缓冲 rxjs 订阅 [英] Debounce and buffer an rxjs subscription
问题描述
我有一个消息队列处理器,可以将消息提供给服务...
I have a message queue processor that feeds messages to a service...
q.on("message", (m) => {
service.create(m)
.then(() => m.ack())
.catch(() => n.nack())
})
该服务使用 RxJS Observable 并订阅 debounceTime()
这些请求.
The service uses an RxJS Observable and subscription to debounceTime()
those requests.
class Service {
constructor() {
this.subject = new Subject()
this.subject.debounceTime(1000)
.subscribe(({ req, resolve, reject }) =>
someOtherService.doWork(req)
.then(() => resolve())
.catch(() => reject())
)
}
create(req) {
return new Promise((resolve, reject) =>
this.subject.next({
req,
resolve,
reject
})
)
}
}
问题是只有去抖动的请求才会得到 ackd/nackd.如何确保订阅也解决/拒绝其他请求?bufferTime()
让我参与其中,但它不会在每次调用 next()
时重置超时持续时间.
The issue is that only the debounced request gets ackd/nackd. How can I ensure that the subscription also resolves/rejects the other requests? bufferTime()
gets me a part of the way there, but it does not reset the timeout duration on each call to next()
.
推荐答案
您当前使用的 debounceTime
操作符可用于创建一个 observable,它可以通知 buffer
当前缓冲区应该何时关闭.
The debounceTime
operator that you are currently using can be used to create an observable that can notify buffer
of when the current buffer should be closed.
然后,buffer
将发出一个在去抖动时收到的消息数组,您可以对它们做任何你想做的事情:
Then, buffer
will emit an array of the messages that were received whilst debouncing and you can do with them whatever you want:
this.subject = new Subject();
const closingNotifier = this.subject.debounceTime(1000);
this.subject.buffer(closingNotifier).subscribe(messages => {
const last = messages.length - 1;
messages.forEach(({ req, resolve, reject }, index) => {
if (index === last) {
/* whatever you are doing, now, with the debounced message */
} else {
/* whatever you need to do with the ignored messages */
}
});
});
这篇关于去抖动和缓冲 rxjs 订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!