如何管理 RxJava 中所需的 Looper 线程 [英] How to manage Looper thread needed in RxJava

查看:40
本文介绍了如何管理 RxJava 中所需的 Looper 线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Observable 中封装一个查询 ContentProvider 并订阅 ContentProvider 游标以提供持续更新的逻辑.

I want to encapsulate in an Observable a logic that queries a ContentProvider and also subscribes ContentProvider cursor to supply continuous updates.

由于 observable 执行 IO 工作,我需要在 Schedulers.io() 中订阅它.问题是我无法注册 ContentObserver 因为它需要一个 Looper 准备好的线程.

As the observable does IO work I need to subscribe it in Schedulers.io(). The problem with that is that then I can't register a ContentObserver because it needs a looper prepared thread.

推荐的管理方式是什么,并将其封装到单个 Observable 中.

What is the recommended way to manage that and encapsulate it in a single Observable.

代码说明:

public Observable<Integer> unreadCountObservable() {
    return Observable.create(subscriber -> {
        new UnreadCountObservable(subscriber);
    });
}

private class UnreadCountObservable {
    private Subscriber subscriber;

    public UnreadCountObservable(Subscriber subscriber) {
        this.subscriber = subscriber;
        Cursor cursor = queryUnread(subscriber);
        cursor.registerContentObserver(observer);
        subscriber.add(Subscriptions.create(() -> {
            cursor.unregisterContentObserver(observer);
            cursor.close();
        }));
    }

    @NonNull
    private Cursor queryUnread(Subscriber subscriber) {
        Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
        if(cursor.moveToNext()) {
            Integer count = cursor.getInt(0);
            subscriber.onNext(count);
        } else {
            subscriber.onNext(0);
        }
        return cursor;
    }

    private ContentObserver observer = new ContentObserver(new Handler()) {
        @Override
        public boolean deliverSelfNotifications() {
            return false;
        }

        @Override
        public void onChange(boolean selfChange) {
            Timber.d("New sms data changed");
            queryUnread(subscriber);
        }
    };
}

注1 上面代码的问题是,由于registerObserver的原因,不能用.subscribeOn(Schedulers.io()调用,如果是将其称为 mainThread 然后查询也在它们上运行)

Note 1 The problem with the above code is that it can't be called with .subscribeOn(Schedulers.io() due to the registerObserver, and if it's called it mainThread then the queries also run on them)

注意:将所有内容封装在一个 Observable 中是一个关键要求,也是这个问题的动机

Note: Encapsulating all in a single Observable is a key requirement and the motive of this question

我现在最好的想法是为活动创建一个 HandlerThread,在那里我使用 Observable 并使用来自该线程的 Looper.但是想知道是否有更好的替代方案,以及制作通用调度程序(例如 looperIoScheduler())是否有意义可能会导致问题.

My best idea now is to create a HandlerThread for activity where I use the Observable and use the looper from that thread. But want to know if there are better alternatives, and if making a generic scheduler (e.g looperIoScheduler()) makes sense on can cause problems.

推荐答案

Observable 链中,您可以随时更改线程.看看这里.

In Observable chain you can change the thread as often as you wish. Have a look here.

函数rx.Observable#observeOn(rx.Scheduler) 可以在链内的任何地方.尝试做这样的事情(伪代码):

The function rx.Observable#observeOn(rx.Scheduler) can be in any place inside the chain. Try to do something like this (pseudocode):

Observable.just(cursor)
        .observeOn(AndroidSchedulers.mainThread())
        .map((Cursor) -> {
                cursor.registerContentObserver(observer);
                return cursor;
            }
        }).observeOn(Schedulers.io());
subscriber.add(Subscriptions.create(() -> {
        cursor.unregisterContentObserver(observer);
        cursor.close();
    }));

这篇关于如何管理 RxJava 中所需的 Looper 线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆