RxJava:可观察和默认线程 [英] RxJava: Observable and default thread
问题描述
我有以下代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
这是输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要细节:我正在从另一个线程(Android 中的 main
线程)调用 subscribe
方法.
Important detail: I'm calling the subscribe
method from another thread (main
thread in Android).
所以看起来 Observable
类是同步的,默认情况下,它在发出事件(s.onNext
),对吗?我想知道......这是有意的行为还是我只是误解了一些东西?实际上,我期望至少 onNext
和 onComplete
回调将在调用者的线程上调用,而不是在发出事件的线程上调用.我是否正确理解在这种特殊情况下实际调用者的线程无关紧要?至少在异步生成事件时.
So looks like Observable
class is synchronous and by default and it performs everything (operators like map
+ notifying subscribers) on the same thread which emits events (s.onNext
), right? I wonder... is it intended behaviour or I just misunderstood something? Actually I was expecting that at least onNext
and onComplete
callbacks will be called on the caller's thread, not on the one emitting events. Do I understand correctly that in this particular case actual caller's thread doesn't matter? At least when events are generated asynchronously.
另一个问题 - 如果我从某个外部来源接收一些 Observable 作为参数怎么办(即我不是自己生成的)...作为它的用户,我无法检查它是否是同步的或异步,我只需要通过 subscribeOn
和 observeOn
方法明确指定我想要接收回调的位置,对吗?
Another concern - what if I receive some Observable as a parameter from some external source (i.e. I don't generate it on my own)... there is no way for me as its user to check if whether it is synchronous or asynchronous and I just have to explicitly specify where I want to receive callbacks via subscribeOn
and observeOn
methods, right?
谢谢!
推荐答案
RxJava 对并发没有意见.如果您不使用任何其他机制(如 observeOn/subscribeOn),它将在订阅线程上生成值.请不要在操作符中使用像 Thread 这样的低级结构,你可能会破坏契约.
RxJava is unopinionated about concurrency. It will produce values on the subscribing thread if you do not use any other mechanisem like observeOn/ subscribeOn. Please don't use low-level constructs like Thread in operators, you could break the contract.
由于使用了Thread,onNext会从调用线程('background-thread-1')中调用.订阅发生在调用(UI 线程)上.链中的每个操作符都将从background-thread-1"-calling-Thread 调用.订阅 onNext 也将从background-thread-1"调用.
Due to the use of Thread, the onNext will be called from the calling Thread ('background-thread-1'). The subscription happens on the calling (UI-Thread). Every operator down the chain will be called from 'background-thread-1'-calling-Thread. The subscription onNext will also be called from 'background-thread-1'.
如果您想生成不在调用线程上的值,请使用:subscribeOn.如果您想将线程切换回主线程,请在链中的某处使用observeOn.最有可能在订阅之前.
If you want to produce values not on the calling thread use: subscribeOn. If you want to switch the thread back to main use observeOn somewhere in the chain. Most likely before subscribing to it.
示例:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
这是一个很好的解释.http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html
这篇关于RxJava:可观察和默认线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!