RxJava:可观察和默认线程 [英] RxJava: Observable and default thread

查看:41
本文介绍了RxJava:可观察和默认线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

这是输出:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

重要细节:我正在从另一个线程(Android 中的 main 线程)调用 subscribe 方法.

Important detail: I'm calling the subscribe method from another thread (main thread in Android).

所以看起来 Observable 类是同步的,默认情况下,它在发出事件(s.onNext),对吗?我想知道......这是有意的行为还是我只是误解了一些东西?实际上,我期望至少 onNextonComplete 回调将在调用者的线程上调用,而不是在发出事件的线程上调用.我是否正确理解在这种特殊情况下实际调用者的线程无关紧要?至少在异步生成事件时.

So looks like Observable class is synchronous and by default and it performs everything (operators like map + notifying subscribers) on the same thread which emits events (s.onNext), right? I wonder... is it intended behaviour or I just misunderstood something? Actually I was expecting that at least onNext and onComplete callbacks will be called on the caller's thread, not on the one emitting events. Do I understand correctly that in this particular case actual caller's thread doesn't matter? At least when events are generated asynchronously.

另一个问题 - 如果我从某个外部来源接收一些 Observable 作为参数怎么办(即我不是自己生成的)...作为它的用户,我无法检查它是否是同步的或异步,我只需要通过 subscribeOnobserveOn 方法明确指定我想要接收回调的位置,对吗?

Another concern - what if I receive some Observable as a parameter from some external source (i.e. I don't generate it on my own)... there is no way for me as its user to check if whether it is synchronous or asynchronous and I just have to explicitly specify where I want to receive callbacks via subscribeOn and observeOn methods, right?

谢谢!

推荐答案

RxJava 对并发没有意见.如果您不使用任何其他机制(如 observeOn/subscribeOn),它将在订阅线程上生成值.请不要在操作符中使用像 Thread 这样的低级结构,你可能会破坏契约.

RxJava is unopinionated about concurrency. It will produce values on the subscribing thread if you do not use any other mechanisem like observeOn/ subscribeOn. Please don't use low-level constructs like Thread in operators, you could break the contract.

由于使用了Thread,onNext会从调用线程('background-thread-1')中调用.订阅发生在调用(UI 线程)上.链中的每个操作符都将从background-thread-1"-calling-Thread 调用.订阅 onNext 也将从background-thread-1"调用.

Due to the use of Thread, the onNext will be called from the calling Thread ('background-thread-1'). The subscription happens on the calling (UI-Thread). Every operator down the chain will be called from 'background-thread-1'-calling-Thread. The subscription onNext will also be called from 'background-thread-1'.

如果您想生成不在调用线程上的值,请使用:subscribeOn.如果您想将线程切换回主线程,请在链中的某处使用observeOn.最有可能在订阅之前.

If you want to produce values not on the calling thread use: subscribeOn. If you want to switch the thread back to main use observeOn somewhere in the chain. Most likely before subscribing to it.

示例:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads
            .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
            .map(integer -> integer) // map happens on Computational-Threads
            .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
            .subscribe(integer -> {
                // called from mainThread
            });

这是一个很好的解释.http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

这篇关于RxJava:可观察和默认线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆