rxjs:如何通过 Observables 对响应进行排序 [英] rxjs: how to order responses via Observables

查看:38
本文介绍了rxjs:如何通过 Observables 对响应进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 socket.io 向我的前端发送一系列响应.响应是顺序的,但根据 socket.io 创建的连接,它们并不总是保证以正确的顺序出现 (https://github.com/josephg/ShareJS/issues/375).

I am using socket.io to send a series of responses to my front-end. The responses are intended to be sequential, but depending on the connection created by socket.io they're not always guaranteed to come in the correct order (https://github.com/josephg/ShareJS/issues/375).

假设每个响应都有一个序列字段,其中包含一个数字(如上图中的数字所示),observable 应该按顺序发出这些响应.

Assuming each response had a sequence field that held a number (shown as the number in the picture above), the observable should emit these responses in order.

如果收到无序响应并且经过一定时间 (n) 没有收到任何响应,我希望我的 observable 发出错误,向我的前端发送信号以重置连接.

If a response is received out of order and a certain amount of time (n) passes without getting any response, I would like for my observable to emit an error, to signal to my front-end to reset the connection.

推荐答案

一个非常好的问题.下面是注释了最重要部分的片段.

A really nice problem. Below a snippet with most important parts commented.

// mock ordered values
const mockMessages = Rx.Observable.fromEvent(document.querySelector('#emit'), 'click')
  .map((e, index) => ({
    index,
    timestamp: e.timeStamp
  }))
  .delayWhen(() => Rx.Observable.timer(Math.random() * 2000)) // distort order

// there is a lot of mutability in `keepOrder`, but all of it
// is sealed and does not leak to outside environment
const keepOrder = timeoutMs => stream =>
  Rx.Observable.defer(() => // need defer to support retries on error
    stream.scan((acc, v) => {
      acc.buffer.push(v)
      acc.buffer.sort((v1, v2) => v1.index - v2.index)
      return acc
    }, {
      lastEmitted: -1,
      buffer: []
    })
    .mergeMap(info => {
      const emission = []
      while (info.buffer.length && info.lastEmitted + 1 === info.buffer[0].index) {
        emission.push(info.buffer.shift())
        info.lastEmitted += 1
      }
      return Rx.Observable.of(emission)
    })
    .switchMap(emissions => {
      if (!emissions.length) { // this condition indicates out of order
        return Rx.Observable.timer(timeoutMs)
          .mergeMapTo(Rx.Observable
            .throw(new Error('ORDER_TIMEOUT')))
      } else {
        return Rx.Observable.from(emissions)
      }
    })
  )


mockMessages
  .do(x => console.log('mocked', x.index))
  .let(keepOrder(1000)) // decrease timeoutMs to increase error probablity
  .do(x => console.log('ORDERED', x.index))
  .retryWhen(es => es
    .do(e => console.warn('ERROR', e)))
  .subscribe()

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

<button id="emit">EMIT</button>

这篇关于rxjs:如何通过 Observables 对响应进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆