RxJava 2:总是取消订阅 .subscribeOn(..) 调度程序? [英] RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?
问题描述
我有一个 Observable
来执行一些工作.完成后,它关闭其连接(通过 .setDisposable(Disposables.fromAction(logic*);
).问题是,我需要在与实际工作负载相同的调度程序上执行此关闭操作.
I have an Observable<String>
that performs some work. After its done, it closes its connections (via .setDisposable(Disposables.fromAction(logic*);
. Trouble is, I need this close action to be performed on the same scheduler as the actual workload.
有可能吗?怎么样?
我不想强制 Observable 在给定的调度器上执行,例如 myObservable.subscribeOn(x).unsubscribeOn(x);
;
推荐答案
您需要一个单线程调度程序才能工作,您可以通过 Schedulers.from(Executor)
或 new 获得一个>SingleScheduler
(警告:内部).
You need a single threaded scheduler for that to work and you can get one via Schedulers.from(Executor)
or new SingleScheduler
(warning: internal).
unsubscribeOn
的问题在于,它仅在下游实际处理/取消流时才运行(与 1.x 几乎一直发生这种情况不同).
The problem with unsubscribeOn
is that it runs only if the downstream actually disposes/cancels the stream (unlike 1.x where this happens almost all the time).
更好的方法是将 using
与上述自定义调度程序一起使用并手动安排清理:
A better way is to use using
with the aforementioned custom scheduler and manually schedule the cleanup:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
但请注意,除非 create
逻辑放弃调度程序(通过终止或异步),否则取消逻辑可能永远不会由于同池活锁而执行.
But note that unless the create
logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock.
这篇关于RxJava 2:总是取消订阅 .subscribeOn(..) 调度程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!