RxJava - 合并的 Observable 可以随时接受更多的 Observable? [英] RxJava - Merged Observable that accepts more Observables at any time?

查看:35
本文介绍了RxJava - 合并的 Observable 可以随时接受更多的 Observable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一个 Observable 实现来保存一个或多个 Observable 并将它们合并.但关键是:我想在任何时候添加更多 Observables 以进行合并,我想它也可能支持删除它们.

I am encountering a need for an Observable implementation that holds one or more Observables and merges them. But here is the kicker: I want to add more Observables to be merged at any time, and I guess it might as well support removing them too.

要使其真正有效,所有订阅者都必须收到来自订阅后添加的新 Observable 的通知.除非所有合并的 Observable 都是冷的并调用 onComplete(),否则我想即使添加了更多的 Observable,也可以让订阅取消订阅.这更多是为了合并多个无限热的 Observable 并能够随时添加更多.

For it to be truly effective, all Subscribers must receive notifications from new Observables that are added post-subscription. Unless all the merged Observables are cold and call onComplete(), then I guess it is okay to let the subscriptions unsubscribe even if more Observables are added. This is more for merging multiple infinite hot Observables and being able to add more at any time.

MergableObservable<MyEvent> allSources = new MergableObservable<>();

//later in application
Observable<MyEvent> eventSource1 = ...
allSources.add(eventSource1);

//and later again
Observable<MyEvent> eventSource2 = ...
allSources.add(eventSource2 );

//and so on
Observable<MyEvent> eventSource3 = ...
allSources.add(eventSource3);

我知道有合并运算符,但我需要一个可变结构.我是否错过了已经存在的东西?我宁愿不使用主题,除非它绝对适合这种情况.

I know there are merging operators, but I need a mutable structure. Am I missing something that already exists? I'd prefer to not use subjects unless it is absolutely appropriate for this situation.

推荐答案

您无法避免使用 Subjects,因为您想手动推送新的 Observable 源而不是自然地"生成它们.

You can't avoid Subjects because you want to manually push new Observable sources and not generate them "naturally".

Subject<Observable<T>, Observable<T>> o = PublishSubject
    .<Observable<T>>create().toSerialized();

ConcurrentHashSet<Observable<T>> live = ...

o.flatMap(v -> v.takeWhile(x -> live.containsKey(v))).subscribe(...);

Observable<T> inner = ...
live.add(inner);
o.onNext(inner);

//...

live.remove(inner);

这篇关于RxJava - 合并的 Observable 可以随时接受更多的 Observable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆