rxjs:如何通过 Observables 对响应进行排序 [英] rxjs: how to order responses via Observables
问题描述
我正在使用 socket.io 向我的前端发送一系列响应.响应是顺序的,但根据 socket.io 创建的连接,它们并不总是保证以正确的顺序出现 (https://github.com/josephg/ShareJS/issues/375).
I am using socket.io to send a series of responses to my front-end. The responses are intended to be sequential, but depending on the connection created by socket.io they're not always guaranteed to come in the correct order (https://github.com/josephg/ShareJS/issues/375).
假设每个响应都有一个序列字段,其中包含一个数字(如上图中的数字所示),observable 应该按顺序发出这些响应.
Assuming each response had a sequence field that held a number (shown as the number in the picture above), the observable should emit these responses in order.
如果收到无序响应并且经过一定时间 (n) 没有收到任何响应,我希望我的 observable 发出错误,向我的前端发送信号以重置连接.
If a response is received out of order and a certain amount of time (n) passes without getting any response, I would like for my observable to emit an error, to signal to my front-end to reset the connection.
推荐答案
一个非常好的问题.下面是注释了最重要部分的片段.
A really nice problem. Below a snippet with most important parts commented.
// mock ordered values
const mockMessages = Rx.Observable.fromEvent(document.querySelector('#emit'), 'click')
.map((e, index) => ({
index,
timestamp: e.timeStamp
}))
.delayWhen(() => Rx.Observable.timer(Math.random() * 2000)) // distort order
// there is a lot of mutability in `keepOrder`, but all of it
// is sealed and does not leak to outside environment
const keepOrder = timeoutMs => stream =>
Rx.Observable.defer(() => // need defer to support retries on error
stream.scan((acc, v) => {
acc.buffer.push(v)
acc.buffer.sort((v1, v2) => v1.index - v2.index)
return acc
}, {
lastEmitted: -1,
buffer: []
})
.mergeMap(info => {
const emission = []
while (info.buffer.length && info.lastEmitted + 1 === info.buffer[0].index) {
emission.push(info.buffer.shift())
info.lastEmitted += 1
}
return Rx.Observable.of(emission)
})
.switchMap(emissions => {
if (!emissions.length) { // this condition indicates out of order
return Rx.Observable.timer(timeoutMs)
.mergeMapTo(Rx.Observable
.throw(new Error('ORDER_TIMEOUT')))
} else {
return Rx.Observable.from(emissions)
}
})
)
mockMessages
.do(x => console.log('mocked', x.index))
.let(keepOrder(1000)) // decrease timeoutMs to increase error probablity
.do(x => console.log('ORDERED', x.index))
.retryWhen(es => es
.do(e => console.warn('ERROR', e)))
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
<button id="emit">EMIT</button>
这篇关于rxjs:如何通过 Observables 对响应进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!