如何管理 RxJava 中所需的 Looper 线程 [英] How to manage Looper thread needed in RxJava
问题描述
我想在 Observable
中封装一个查询 ContentProvider
并订阅 ContentProvider
游标以提供持续更新的逻辑.
I want to encapsulate in an Observable
a logic that queries a ContentProvider
and also subscribes ContentProvider
cursor to supply continuous updates.
由于 observable 执行 IO 工作,我需要在 Schedulers.io()
中订阅它.问题是我无法注册 ContentObserver
因为它需要一个 Looper 准备好的线程.
As the observable does IO work I need to subscribe it in Schedulers.io()
. The problem with that is that then I can't register a ContentObserver
because it needs a looper prepared thread.
推荐的管理方式是什么,并将其封装到单个 Observable
中.
What is the recommended way to manage that and encapsulate it in a single Observable
.
代码说明:
public Observable<Integer> unreadCountObservable() {
return Observable.create(subscriber -> {
new UnreadCountObservable(subscriber);
});
}
private class UnreadCountObservable {
private Subscriber subscriber;
public UnreadCountObservable(Subscriber subscriber) {
this.subscriber = subscriber;
Cursor cursor = queryUnread(subscriber);
cursor.registerContentObserver(observer);
subscriber.add(Subscriptions.create(() -> {
cursor.unregisterContentObserver(observer);
cursor.close();
}));
}
@NonNull
private Cursor queryUnread(Subscriber subscriber) {
Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
if(cursor.moveToNext()) {
Integer count = cursor.getInt(0);
subscriber.onNext(count);
} else {
subscriber.onNext(0);
}
return cursor;
}
private ContentObserver observer = new ContentObserver(new Handler()) {
@Override
public boolean deliverSelfNotifications() {
return false;
}
@Override
public void onChange(boolean selfChange) {
Timber.d("New sms data changed");
queryUnread(subscriber);
}
};
}
注1 上面代码的问题是,由于registerObserver的原因,不能用.subscribeOn(Schedulers.io()
调用,如果是将其称为 mainThread 然后查询也在它们上运行)
Note 1 The problem with the above code is that it can't be called with .subscribeOn(Schedulers.io()
due to the registerObserver, and if it's called it mainThread then the queries also run on them)
注意:将所有内容封装在一个 Observable
中是一个关键要求,也是这个问题的动机
Note: Encapsulating all in a single Observable
is a key requirement and the motive of this question
我现在最好的想法是为活动创建一个 HandlerThread,在那里我使用 Observable 并使用来自该线程的 Looper.但是想知道是否有更好的替代方案,以及制作通用调度程序(例如 looperIoScheduler())是否有意义可能会导致问题.
My best idea now is to create a HandlerThread for activity where I use the Observable and use the looper from that thread. But want to know if there are better alternatives, and if making a generic scheduler (e.g looperIoScheduler()) makes sense on can cause problems.
推荐答案
在 Observable
链中,您可以随时更改线程.看看这里.
In Observable
chain you can change the thread as often as you wish. Have a look here.
函数rx.Observable#observeOn(rx.Scheduler)
可以在链内的任何地方.尝试做这样的事情(伪代码):
The function rx.Observable#observeOn(rx.Scheduler)
can be in any place inside the chain. Try to do something like this (pseudocode):
Observable.just(cursor)
.observeOn(AndroidSchedulers.mainThread())
.map((Cursor) -> {
cursor.registerContentObserver(observer);
return cursor;
}
}).observeOn(Schedulers.io());
subscriber.add(Subscriptions.create(() -> {
cursor.unregisterContentObserver(observer);
cursor.close();
}));
这篇关于如何管理 RxJava 中所需的 Looper 线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!