如何在可观察流中处理前n个项并保持不同的项 [英] How to process first n items and remaining one differently in a observable stream

查看:94
本文介绍了如何在可观察流中处理前n个项并保持不同的项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,

给定一定数量(m)的数字流(m1,m2,m3,m4,m5,m6 ......),并对前n个项目(n可以小于,等于或大于m)应用变换(2 * i),对其余项目应用另一个变换(3 * i)。和

given a stream of a certain number (m) of numbers (m1, m2, m3, m4, m5, m6...), and apply a transformation (2 * i) to first n items (n can be less, equal or larger than m), apply another transformation (3 * i) to the remaining items. and

返回结果:m1 * 2,m2 * 2,m3 * 3,m4 * 3,m5 * 3,m6 * 3 ......(假设n = 2在这里)。

return result : m1*2, m2*2, m3*3, m4*3, m5*3, m6*3... (assuming n=2 here).

我试图使用take(n)和skip(n)然后连续,但看起来像take(n)将删除其余的项目序列,并在之后返回任何内容后跳过(n)。

I was trying to use take(n) and skip(n) and then concatwith, but looks like take(n) will drop the remaining items in the sequence , and make skip(n) after that return nothing.

推荐答案

您可以共享m的流,然后合并回来 take() skip() streams,如下所示:

You can share your m's stream, and then merge back together take() and skip() streams, something like this:

    int m = 10;
    int n = 8;
    Observable<Integer> numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .publish();

    Observable<Integer> firstNItemsStream = numbersStream.take(n)
            .map(i -> i * 2);

    Observable<Integer> remainingItemsStream = numbersStream.skip(n)
            .map(i -> i * 3);

    Observable.merge(firstNItemsStream, remainingItemsStream)
            .subscribe(integer -> System.out.println("result = " + integer));
    numbersStream.connect();

编辑:

由@指出AE Daphne, share()将开始与第一个订阅者一起发送,因此如果Observable已经开始发出项目,则第二个订阅者可能会错过notification / s,所以在这种情况下还有其他可能性:

cache() - 将回复所有缓存发出的项目并将其回复给每个新订阅者,但是将牺牲取消订阅能力,因此需要谨慎使用。

回复()。refCount() - 将创建 Observable reply()每个新订阅者的所有先前项目(类似于缓存),但最后会取消订阅订阅者取消订阅。


As point out by @A.E. Daphne, share() will start emitting with the first subscriber, thus second subscriber might miss notification/s if the Observable started already to emit item/s, so in this case there are other possibilities:
cache() - will reply all cache emitted items and reply them to each new subscriber, but will sacrifice the unsubscription ability, thus need to be used carefully.
reply().refCount() - will create Observable that reply() all previous items to each new Subscriber (similar to cache), but will unsubscribe when the last subscriber unsubscribe from it.

在这两种情况下,都应该考虑内存,因为 Observable 会将所有发出的项目缓存在内存中。

In both cases, memory should take into consideration as the Observable will cache all emitted items in memory.

publish() - 无需缓存所有以前的项目的其他可能性是使用 publish()创建 ConnectableObservable ,并将其命名为 connect()在订阅所有订阅者之后开始排放的方法,因此将获得同步并且所有订阅者将正确地获得所有通知。

publish() - Additional possibility, without caching all previous items, would be to use publish() to create ConnectableObservable, and call it's connect() method to begin emissions after all required subscribers subscribed, thus will get synchronization and all subscribers will get all notifications correctly.

这篇关于如何在可观察流中处理前n个项并保持不同的项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆