如何订阅和取消订阅或取消 rxjava observable [英] How to subscribe and unsubscribe or cancel an rxjava observable

查看:229
本文介绍了如何订阅和取消订阅或取消 rxjava observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 RxJava 新手,正在尝试将我的 asyncTask 工作更新为 RxJava.作为第一次尝试,我完成了以下代码:

 公共类 MainActivity 扩展 AppCompatActivity{@覆盖protected void onCreate(Bundle savedInstanceState){super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);做一些工作();}私有字符串 funcCallServerGet(){//调用HttpClient Get方法的一些代码&返回响应字符串//这是我以前在 asynctask doInbackground 方法中调用的方法}私有无效 doSomeWork() {getSingleObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(getSingleObserver()) ;}私人单<字符串>getSingleObservable(){return Single.create(new SingleOnSubscribe() {@覆盖公共无效订阅(SingleEmitter发射器)抛出异常{if(!emitter.isDisposed()) {String strRxResponse = funcCallServerGet();发射器.onSuccess(strRxResponse);}}});}私有 SingleObservergetSingleObserver(){返回新的 SingleObserver() {@覆盖公共无效订阅(一次性d){Log.d(TAG, " onSubscribe getSingleObserver: " + d.isDisposed());}@覆盖公共无效onSuccess(字符串值){Log.d(TAG, " onNext : value : " + value);}@覆盖public void onError(Throwable e) {Log.d(TAG, " onError : " + e.getMessage());}};}}

但我有一些困惑:

  1. 为什么我在 SingleObserver getSingleObserver()onSubscribe() 中得到 false .

  2. 当活动 onStop() 被调用时,我如何取消订阅或取消 observable/observer.

  3. 此外,屏幕定向时会发生什么.observable 是自动取消订阅还是继续工作?设备旋转该怎么做?

解决方案

为什么我在 SingleObserver getSingleObserver() 的 onSubscribe() 中得到 false .

您当前正在记录是否在 onSubscribe 方法中处置了一次性物品.在这一点上,一次性物品还没有被处理掉.

<块引用>

当活动 onStop() 被调用时,我如何取消订阅或取消 observable/observer.

您可以使用返回一次性的订阅方法,而不是使用 SingleObserver.有了这个,您可以直接管理一次性用品或使用 CompositeDisposable.然后,您将对该一次性物品调用 dispose 方法,使用 CompositeDisposable,这是通过调用 clear()

来实现的

private final CompositeDisposabledisposables = new CompositeDisposable();@覆盖受保护的无效 onStart() {super.onStart();一次性用品.添加(getSingleObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).订阅(值 -> {Log.d(TAG, " onSuccess: " + value);}, 错误 ->{Log.e(TAG, " onError", 错误);}));}@覆盖受保护的无效 onStop() {一次性用品.clear();super.onStop();}

<块引用>

此外,屏幕定向时会发生什么.observable 是自动取消订阅还是继续工作?设备旋转怎么办?

默认情况下不会自动管理 observable,管理它是你的责任.在您的示例代码中,当设备旋转时,您将收到另一个对 onCreate 的调用,在这里您正在安排要再次执行的工作,在旋转之前安排的工作仍然可以运行,因此您最终可能会泄漏旧活动并接收工作成功或失败时的回调 - 在这种情况下,您会看到一条日志语句.

有一些工具可以提供自动可观察性管理,不过您应该阅读作者关于这种方法存在的一些问题的文章.

您的另一个选择是查看新的架构组件库,特别是 ViewModel 和 LiveData.这将简化您在订阅管理和配置更改方面需要做的事情.

I am new in RxJava and trying to update my asyncTask works to RxJava. As a first try I have done the following codes:

    public class MainActivity extends AppCompatActivity 
    {
        @Override
        protected void onCreate(Bundle savedInstanceState) 
        {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);

            doSomeWork();
        }


        private String funcCallServerGet()
        {
            //Some code to call a HttpClient Get method & return a response string
            //this is the method which previously i used to call inside asynctask doInbackground method
        }


          private void doSomeWork() {
               getSingleObservable()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(getSingleObserver())    ;
        }


        private Single<String> getSingleObservable() 
        {
            return Single.create(new SingleOnSubscribe<String>() {
                @Override
                public void subscribe(SingleEmitter<String> emitter) throws Exception {
                    if(!emitter.isDisposed()) {
                        String strRxResponse =  funcCallServerGet();
                        emitter.onSuccess(strRxResponse);
                    }
                }
            });
        }


         private SingleObserver<String> getSingleObserver() 
        {

            return new SingleObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, " onSubscribe getSingleObserver: " + d.isDisposed());             }

                @Override
                public void onSuccess(String value) {
                    Log.d(TAG, " onNext : value : " + value);          }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, " onError : " + e.getMessage());         }
            };
        }

    }

But I have some confusions:

  1. Why am I getting false in onSubscribe() of SingleObserver getSingleObserver() .

  2. How do I unsubscribe or cancel the observable/observer when activities onStop() is called.

  3. Also, what really happens when screen oriantation. Does the observable get unsubscribed automatically or it continues its work ? what to do for the device rotation ?

解决方案

Why am I getting false in onSubscribe() of SingleObserver getSingleObserver() .

You're currently logging whether the disposable is disposed within the onSubscribe method. At this point the disposable hasn't been disposed yet.

How do I unsubscribe or cancel the observable/observer when activities onStop() is called.

Rather than use a SingleObserver you could use the subscribe method which returns a disposable. With this you could either manage the disposable directly or use a CompositeDisposable. You would then call the dispose method on that disposable, with CompositeDisposable this is achieved by calling clear()

private final CompositeDisposable disposables = new CompositeDisposable();

@Override
protected void onStart() {
    super.onStart();
    disposables.add(getSingleObservable()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(value -> {
                Log.d(TAG, " onSuccess: " + value);
            }, error -> {
                Log.e(TAG, " onError", error);
            }
        )
    );
}

@Override
protected void onStop() {
    disposables.clear();
    super.onStop();
}

Also, what really happens when screen oriantation. Does the observable get unsubscribed automatically or it continues its work ? what to do for the device rotation ?

By default no automatic management of the observable occurs, it's your responsibility to manage it. In your example code when the device rotates you will receive another call to onCreate, here you're scheduling the work to be executed again, work that was scheduled before rotation could still be running, so you could end up leaking the old activity and receiving a callback when the work succeeds or fails - in this case you'd see a log statement.

There are some tools that provide automatic observable management, though you should read the authors article about some of the issues that exist with this approach.

Another option for you could be to look at the new Architecture Components library, specifically ViewModel and LiveData. This will simplify what you need to do with respect to subscription management and configuration changes.

这篇关于如何订阅和取消订阅或取消 rxjava observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆