combineLatest 仅在其中一个流发生变化时发出 [英] combineLatest emit only when one of the streams changes

查看:36
本文介绍了combineLatest 仅在其中一个流发生变化时发出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有频繁值的流和一个带有较慢值的流.我想将它们组合起来,但只在较慢的一个发出时才发出一个值.所以 combineLatest 不起作用.像这样:

I have a stream with frequent values and one with slower ones. I want to combine them, but only emit a value when the slower one emits. So combineLatest doesn't work. Like so:

a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)

目前我是这样做的,有没有更干净的方法?

Currently I'm doing it like follows, is there a cleaner way?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
  Observable({ o =>
    var last : A
    fast.subscribe({a => last = a})
    slow.subscribe({b => o.onNext((last,b))})
  })

edit:这个操作符现在在 Rx 中,被称为 withLatestFrom.

edit: This operator is now in Rx and is called withLatestFrom.

推荐答案

你要找的是一个我称之为combinePrev"的组合器,它在 API 中并不存在,但在很多情况下是非常必要的.sample 运算符很接近,但它不会组合两个流.我也错过了 RxJS 中的combinePrev".事实证明,combinePrev"(withLatest")的实现很简单,只依赖于map和switch:

What you are looking for is a combinator I have called "combinePrev", which doesn't exist in the API but turns out to be very necessary in many situations. sample operator comes close, but it doesn't combine the two streams. I've also missed "combinePrev" in RxJS. It turns out, the implementation of "combinePrev" ("withLatest") is simple and just depends on map and switch:

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
  val hotSlow = slow.publish.refCount
  fast.map({a => hotSlow.map({b => (a,b)})}).switch
}

这是在 RxJS 中实现的相同运算符的 jsfiddle 示例.

Here is a jsfiddle example of the same operator implemented in RxJS.

虽然操作符不在 Rx 中,但您可以使用隐式类,这样您就可以使用 slow.withLatest(fast):

While the operator is not in Rx, you can use an implicit class so you can use slow.withLatest(fast):

implicit class RXwithLatest[B](slow: Observable[B]) {
  def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}

注意:slow必须是hot.如果 slow 是一个冷 Observable,withLatest 不起作用.

Note: slow must be hot. If slow is a cold Observable, withLatest doesn't work.

这篇关于combineLatest 仅在其中一个流发生变化时发出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆