RxJava -2 Observables 随时接受更多 Observables? [英] RxJava -2 Observables that accepts more Observables at any time?

查看:25
本文介绍了RxJava -2 Observables 随时接受更多 Observables?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用 rx-java 2 并且有一个用例,其中单个 Camel Route 订阅者需要使用多个 Observable.使用此解决方案作为参考,我有一个部分可行的解决方案.RxJava - 合并的 Observable 随时接受更多的 Observable?

I'm currently using rx-java 2 and have a use case where multiple Observables need to be consumed by single Camel Route subscriber. Using this solution as a reference, I have a partly working solution. RxJava - Merged Observable that accepts more Observables at any time?

我打算使用一个 PublishProcessor,它将订阅一个骆驼反应流订阅者,然后维护一个 ConcurrentHashSet>我可以动态添加新的 Observable.
我目前遇到了如何使用 PublishProcessor 添加/管理 Flowable 实例的问题?我真的是 rx java 的新手,所以感谢任何帮助!这是我到目前为止:

I'm planning to use a PublishProcessor<T> that will be subscribed to one camel reactive stream subscriber and then maintain a ConcurrentHashSet<Flowable<T>> where I can dynamically add new Observable.
I'm currently stuck on how can I add/manage Flowable<T> instances with PublishProcessor? I'm really new to rx java, so any help is appreciated! This is what I have so far :

PublishProcessor<T> publishProcessor = PublishProcessor.create();
CamelReactiveStreamsService camelReactiveStreamsService = 
CamelReactiveStreams.get(camelContext);
Subscriber<T> subscriber = 
     camelReactiveStreamsService.streamSubscriber("t-class",T.class);
}
Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>());

public void add(Flowable<T> flowableOrder){
    flowableSet.add(flowableOrder);
}

public void subscribe(){
    publishProcessor.flatMap(x -> flowableSet.forEach(// TODO)
    }) .subscribe(subscriber);
}

推荐答案

您可以拥有一个 Processor 并订阅多个可观察流.您需要在添加和删除 observable 时通过添加和删除订阅来管理订阅.

You can have a single Processor and subscribe to more than one observable stream. You would need to manage the subscriptions by adding and removing them as you add and remove observables.

PublishProcessor<T> publishProcessor = PublishProcessor.create();

Map<Flowable<T>, Disposable> subscriptions = new ConcurrentHashMap<>();

void addObservable( Flowable<T> flowable ) {
  subscriptions.computeIfAbsent( flowable, fkey -> 
    flowable.subscribe( publishProcessor ) );
}
void removeObservable( Flowable<T> flowable ) {
  Disposable d = subscriptions.remove( flowable );
  if ( d != null ) {
    d.dispose();
  }
}
void close() {
  for ( Disposable d: subscriptions.values() ) {
    d.dispose();
  }
}

使用 flowable 作为地图的关键,并添加或删除订阅.

Use the flowable as the key to the map, and add or remove subscriptions.

这篇关于RxJava -2 Observables 随时接受更多 Observables?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆