RxJava2 .subscribeOn .observeOn混乱.在主线程上运行 [英] RxJava2 .subscribeOn .observeOn confusion. Running on Main thread

查看:70
本文介绍了RxJava2 .subscribeOn .observeOn混乱.在主线程上运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个调用Web服务的方法,我认为该方法在IO线程上运行,直到服务停止并且UI冻结为止.

I had a method calling a webservice which I thought was running on IO thread until the service stopped and the UI froze.

所以我开始了一些简单的测试来检查线程

So I started some simple testing to check threading

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'


public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })

                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

尽管订阅并观察到了,但还是产生了以下输出,指示一切都在主线程上运行?

Is resulting in the following output indicating everything is running on the main thread, despite having subscribe and observe ?

Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main

但是 如果我按以下方式将observeOn移到.subscribeWith之前,那么...

BUT If I move the observeOn to immediately before the .subscribeWith as follows...

public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

输出是我正在寻找的内容,即使阅读了许多有关RxJava的博客,我也必须说这使我感到困惑.

The output is what I am looking for which I must say is confusing me even after reading many blogs about RxJava.

Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main

我已经剥离了原始方法,直到它几乎是Blog帖子中示例方法的副本

I've stripped my original method back until it's pretty much a copy of an example method on a Blog post

像老板一样的多线程

表示这应该在IO线程上运行loadPerson(),并在主线程上发出.没有.

which implies that this should run the loadPerson() on the IO thread, emitting on the main thread. It doesn't.

disposableRx.add(
        repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableMaybeObserver<String>() {

                    @Override
                    public void onSuccess(@NonNull String response) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onComplete() {

                    }
                })
);

从我的方法中转出线程是否表明它正在主线程上运行?

Dumping out the thread from within my method shows that it's running on the Main thread?

这是什么原因造成的?

推荐答案

放置observeOn()sunbscribeOn()顺序和其他运算符非常重要 >.

The order in which you put observeOn() and sunbscribeOn() , and other operators is very important.

  • subscribeOn()运算符告诉源Observable 在哪个线程上发射和转换项目.

  • subscribeOn() operator tells the source Observable which thread to emit and transform items on.

请注意将observeOn()运算符放在何处,因为它会发生变化 执行工作的线程!在大多数情况下,您可能希望延迟 切换到观察线程,直到Rx链的尽头.

Be careful where you put the observeOn() operator because it changes the thread performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.

Observable.just("long", "longer", "longest")
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .map(String::length)
    .filter(length -> length == 6)
    .subscribe(length -> System.out.println("item length " + length));

此处,地图,过滤和使用是在主线程

here the map, filter and consuming is performed in the main thread

    之前的
  • observeOn() 没有理由在map()运算符上方应用observeOn()运算符. 实际上,此代码将导致NetworkOnMainThreadException!我们不想从主线程上的HTTP响应中读取消息-应该在切换回主线程之前完成此操作.

  • observeOn() before map() There is no reason to have observeOn() operator applied above the map() operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread.

您还可以使用多个watchOn()来切换线程,例如本例.

you can also use multiple observeOn() to switch threads like this example.

 Observable.just("long", "longer", "longest")
.doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(String::toString)
.doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.io())
.map(String::toString)
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

输出:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

注意 根据此答案 subscribeOn()不适用于下游运算符,因此不能保证您的操作将在其他线程上进行.

Note according to this answer subscribeOn() does't apply to the downstream operators, Therefore it does not guarantee that your operation is going to be on a different Thread.

subscribeOn效果向上游延伸并更靠近 事件.

subscribeOn effects go upstream and closer to the source of events.

关于您的问题,我已经进行了测试,这是结果

As for your problem I have made a test and here are the results

   private void testRxJava2Async() {
 io.reactivex.Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {

            Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

            return null;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new Observer<String>() {


                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {

                    Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

测试结果:

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main

这篇关于RxJava2 .subscribeOn .observeOn混乱.在主线程上运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆