RxJs:zip运算符的有损形式 [英] RxJs: lossy form of zip operator

查看:230
本文介绍了RxJs:zip运算符的有损形式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑使用 zip 运算符将两个无限的Observable压缩在一起发射物品的次数是另一次发射的两倍。

目前的实施方案是无损耗的,即如果我让这些Observables发射一小时,然后我在它们的发射率之间切换,第一个Observable最终会赶上另一个。

随着缓冲区变得越来越大,这将导致内存爆炸。

如果第一个observable将发出几个小时的项目,那么同样会发生第二个将在最后发出一个项目。



如何为此运营商实现有损行为?我只是想随时从两个流中获得排放而且我不关心我错过的更快流量的排放量。



澄清:




  • 由于的无损性质,我在这里试图解决的主要问题是内存爆炸zip operator。

  • 我希望随时从两个流中获取排放,即使两个流每次都发出相同的值



示例:

  Stream1:1 2 3 4 5 6 7 
Stream2:10 20 30 40 50 60 70

常规 zip 将产生以下输出:

  [1,10] 
[2,20]
[3,30]
[4,40]
[5,50]
[6,60 ]
[7,70]

  const Observable = Rx.Observable; const Subject = Rx.Subject; const s1 = new Subject(); const s2 = new Subject(); Observable.zip(s1,s2).subscribe(console.log); s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); s2.next(40); s2.next(50); s2.next(60); s2.next(70);  

 < script src =https:// unpkg。 COM / @ reactivex / rxjs @ 5.0.3 / DIST /全球/ Rx.js>< /脚本>  



我希望它产生的输出:

  [1, 10] 
[3,20]
[5,30]

说明:

有损 zip 运算符 zip with缓冲区大小 1 。这意味着它只会保留首先发出的流中的第一个项目,并将丢弃所有其余项目(第一个项目和第二个流中的第一个项目之间到达的项目)。所以在这个例子中会发生以下情况: stream1 发出 1 ,有损zip会记住它并忽略所有 stream1 上的项目,直到 stream2 发出。第一次 stream2 的排放是 10 所以 stream1 looses 2 。在相互排放(第一次发射有损 zip )后,它重新开始:记住 3 ,松散 4 ,发出 [3,20] 。然后重新开始:记住 5 ,松散 6 7 ,发出 [5,30] 。然后重新开始:记住 40 ,松散 50 60 70 并等待 stream1 上的下一项。



示例2:

  Stream1:1 2 3。 .. 100000000000 
Stream2:

常规 zip 运算符会爆炸内存。

我不希望它。



摘要:

基本上我希望有损 zip 运算符只记住 stream 1 之前的共同发射,当流2 赶上流1 时发出。并重复。

解决方案

以下内容将为您提供所需的行为:

  Observable.zip(s1.take(1),s2.take(1))。repeat()

RxJs 5.5 管道语法:

  zip(s1.pipe(take(1)),s2.pipe(take(1)))。pipe(repeat()); 

  const s1 = new Rx.Subject(); const s2 = new Rx.Subject(); Rx.Observable.zip(s1.take(1),s2.take(1))。repeat().subscribe (的console.log); s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); s2.next(40); s2.next(50); s2.next(60); s2.next(70);  

 < script src =https:// unpkg。 COM / @ reactivex / rxjs @ 5.0.3 / DIST /全球/ Rx.js>< /脚本>  



说明:




  • 重复运算符(在其当前实现中)在后者完成时重新订阅源可观察对象,即在此特定情况下,它在每个共同时重新订阅 zip 发射。

  • zip 组合了两个observable并等待它们两个发射。 combineLatest 也会这样做,因为 take(1)
  • $ b并不重要$ b
  • take(1)实际上会处理内存爆炸并定义有损行为



如果你想在相互发射时从每个流中获取最后一个而不是第一个值,请使用:

  Observable.combineLatest(s1,s2).take(1).repeat()

In RxJs 5.5 管道语法:

  combineLatest(s1.pipe(take(1)) ,s2.pipe(take(1)))。pipe(repeat()); 

  const s1 = new Rx.Subject(); const s2 = new Rx.Subject(); Rx.Observable.combineLatest(s1,s2).take(1).repeat().subscribe(console.log) ; s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); s2.next(40); s2.next(50); s2.next(60); s2.next(70);  

 < script src =https:// unpkg。 COM / @ reactivex / rxjs @ 5.0.3 / DIST /全球/ Rx.js>< /脚本>  


Consider using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other.
The current implementation is loss-less, i.e. if I keep these Observables emitting for an hour and then I switch between their emitting rates, the first Observable will eventually catch up with the other.
This will cause memory explosion at some point as the buffer grows larger and larger.
The same will happen if first observable will emit items for several hours and the second will emit one item at the end.

How do I achieve lossy behavior for this operator? I just want to emit anytime I get emissions from both streams and I don't care how many emissions from the faster stream I miss.

Clarifications:

  • Main problem I'm trying to solve here is memory explosion due to the loss-less nature of zip operator.
  • I want to emit anytime I get emissions from both streams even if both streams emit the same value every time

Example:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

Regular zip will produce the following output:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 

<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

The output I'd like it to produce:

[1, 10]
[3, 20]
[5, 30]

Explanation:
Lossy zip operator is zip with buffer size 1. That means it will only keep the first item from the stream that emitted first and will loose all the rest (items that arrive between first item and first emission from the second stream). So what happens in the example is the following: stream1 emits 1, lossy zip "remembers" it and ignores all the items on stream1 until stream2 emits. First emission of stream2 is 10 so stream1 looses 2. After mutual emission (the first emission of lossy zip) it starts over: "remember" 3, "loose" 4, emit [3,20]. Then start over: "remember" 5, "loose" 6 and 7, emit [5,30]. Then start over: "remember" 40, "loose" 50,60,70 and wait for the next item on stream1.

Example 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

Regular zip operator will explode the memory in this case.
I don't want it to.

Summary:
Essentially I expect the lossy zip operator to remember only the first value emitted by stream 1 after previous mutual emission and emit when stream 2 catches up with stream 1. And repeat.

解决方案

The following will give you the desired behavior:

Observable.zip(s1.take(1), s2.take(1)).repeat()

In RxJs 5.5 pipe syntax:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 

<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

Explanation:

  • repeat operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes to zip upon every mutual emission.
  • zip combines two observables and waits for both of them to emit. combineLatest will do as well, it doesn't really matter because of take(1)
  • take(1) actually takes care of memory explosion and defines lossy behavior

If you want to take the last and not the first value from each stream upon mutual emission use this:

Observable.combineLatest(s1, s2).take(1).repeat()

In RxJs 5.5 pipe syntax:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 

<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

这篇关于RxJs:zip运算符的有损形式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆