RxJs:有损形式的 zip 运算符 [英] RxJs: lossy form of zip operator
问题描述
考虑使用 zip 运算符将两个无限的 Observable 压缩在一起,其中之一它发出项目的频率是另一个的两倍.
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后我在它们的发射率之间切换,第一个 Observable 最终会赶上另一个.
随着缓冲区越来越大,这将在某些时候导致内存爆炸.
如果第一个 observable 将发射项目数小时,而第二个将在最后发射一个项目,也会发生同样的情况.
Consider using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other.
The current implementation is loss-less, i.e. if I keep these Observables emitting for an hour and then I switch between their emitting rates, the first Observable will eventually catch up with the other.
This will cause memory explosion at some point as the buffer grows larger and larger.
The same will happen if first observable will emit items for several hours and the second will emit one item at the end.
如何实现此运算符的有损行为?我只想在我从两个溪流中获得排放物的任何时候排放,我不在乎我错过了来自更快的溪流的排放量.
How do I achieve lossy behavior for this operator? I just want to emit anytime I get emissions from both streams and I don't care how many emissions from the faster stream I miss.
说明:
- 我在这里尝试解决的主要问题是由于
zip
运算符的无损性质而导致的内存爆炸. - 我想在任何时候从两个流中获得排放,即使两个流每次都发出相同的值
- Main problem I'm trying to solve here is memory explosion due to the loss-less nature of
zip
operator. - I want to emit anytime I get emissions from both streams even if both streams emit the same value every time
示例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
常规 zip
将产生以下输出:
Regular zip
will produce the following output:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
我希望它产生的输出:
[1, 10]
[3, 20]
[5, 30]
解释:
有损 zip
运算符是 zip
,缓冲区大小为 1
.这意味着它只会保留最先发出的流中的第一个项目,而将丢失所有其余项目(在第一个项目和第二个流的第一个发射之间到达的项目).因此,示例中发生的情况如下:stream1
发出 1
,有损 zip记住"它并忽略 stream1
上的所有项目,直到 stream2
发出.stream2
的第一次发射是 10
所以 stream1
丢失 2
.在相互发射后(有损 zip
的第一次发射)它重新开始:记住"3
,松散"4
,发射 [3,20]
.然后重新开始:remember"5
,loose"6
和7
,发出[5,30]
.然后重新开始:记住"40
,loose"50
,60
,70
等待下一个stream1
上的项目.
Explanation:
Lossy zip
operator is zip
with buffer size 1
. That means it will only keep the first item from the stream that emitted first and will loose all the rest (items that arrive between first item and first emission from the second stream). So what happens in the example is the following: stream1
emits 1
, lossy zip "remembers" it and ignores all the items on stream1
until stream2
emits. First emission of stream2
is 10
so stream1
looses 2
. After mutual emission (the first emission of lossy zip
) it starts over: "remember" 3
, "loose" 4
, emit [3,20]
. Then start over: "remember" 5
, "loose" 6
and 7
, emit [5,30]
. Then start over: "remember" 40
, "loose" 50
,60
,70
and wait for the next item on stream1
.
示例 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
在这种情况下,常规 zip
运算符会爆炸内存.
我不想这样.
Regular zip
operator will explode the memory in this case.
I don't want it to.
总结:
本质上,我希望有损 zip
运算符只记住 stream 1
在之前的相互发射之后 发出的第一个值,并在 stream 2 时发出
赶上 stream 1
.并重复.
Summary:
Essentially I expect the lossy zip
operator to remember only the first value emitted by stream 1
after previous mutual emission and emit when stream 2
catches up with stream 1
. And repeat.
推荐答案
以下将为您提供所需的行为:
The following will give you the desired behavior:
Observable.zip(s1.take(1), s2.take(1)).repeat()
在 RxJs 5.5+
管道语法中:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
说明:
repeat
操作符(在其当前实现中)在源 observable 完成后重新订阅源 observable,即在这种特殊情况下,它在每次相互发射时重新订阅zip
.zip
组合两个 observable 并等待它们都发出.combineLatest
也可以,这并不重要,因为take(1)
take(1)
实际上负责内存爆炸并定义有损行为
repeat
operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes tozip
upon every mutual emission.zip
combines two observables and waits for both of them to emit.combineLatest
will do as well, it doesn't really matter because oftake(1)
take(1)
actually takes care of memory explosion and defines lossy behavior
如果您想在相互发射时从每个流中获取最后一个值而不是第一个值,请使用:
If you want to take the last instead of the first value from each stream upon mutual emission use this:
Observable.combineLatest(s1, s2).take(1).repeat()
在 RxJs 5.5+
管道语法中:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
这篇关于RxJs:有损形式的 zip 运算符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!