具有多个订阅者和事件的RxJava并发 [英] RxJava concurrency with multiple subscribers and events

查看:763
本文介绍了具有多个订阅者和事件的RxJava并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种将多个订户附加到RxJava Observable流的方法,每个订户都异步处理发出的事件.

I'm looking for a way to attach multiple subscribers to an RxJava Observable stream, with each subscriber processing emitted events asynchronously.

我首先尝试使用.flatMap(),但似乎不适用于任何后续订阅者.所有订阅者都在同一线程上处理事件.

I first tried using .flatMap() but that didn't seem to work on any subsequent subscribers. All subscribers were processing events on the same thread.

.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread()))

最终的工作是通过每次创建一个新的Observable来消耗新线程中的每个事件:

What ended up working was consuming each event in a new thread by creating a new Observable each time:

Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .subscribe(j -> {
                Observable.just(j)
                        .subscribeOn(Schedulers.newThread())
                        .subscribe(i -> {
                            try {
                                Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                        });
            });

输出:

s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-2=>2
s1=>RxNewThreadScheduler-3=>3

最终有多个订阅者:

ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .publish();

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.connect();

输出:

s2=>RxNewThreadScheduler-4=>2
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-3=>2
s2=>RxNewThreadScheduler-6=>3
s2=>RxNewThreadScheduler-2=>1
s1=>RxNewThreadScheduler-5=>3

但是,这似乎有些笨拙.有没有更优雅的解决方案,或者RxJava并不是一个很好的用例?

However, this seems a little clunky. Is there a more elegant solution or is RxJava just not a good use case for this?

推荐答案

您可以使用Flowable和parallel来实现它:

You can achieve it with Flowable and parallel:

        Flowable.fromIterable(Arrays.asList("1", "2", "3"))
                .parallel(3)
                .runOn(Schedulers.newThread())
                .map(item -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + item);

                    return Completable.complete();
                })
        .sequential().subscribe();

这篇关于具有多个订阅者和事件的RxJava并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆