Rx Java Android:如何将此回调块转换为观察者 [英] Rx Java Android : How to convert this callback block to Observer

查看:20
本文介绍了Rx Java Android:如何将此回调块转换为观察者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过 Amazon 的 S3 Android SDK 上传文件.我已经稍微使用过 RX Java,但我不确定如何将此方法转换为返回 Observable 的方法,因为我想将此方法的结果链接到另一个 Observable 调用.我想这让我感到困惑,因为这不会立即返回,并且在 OnError 或 OnState 更改之前无法返回.我如何以 RX 方式处理这些情况?

public void uploadFile(TransferObserver transferObserver){transferObserver.setTransferListener(new TransferListener() {@覆盖public void onStateChanged(int id, TransferState state) {}@覆盖public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {}@覆盖public void onError(int id, Exception ex) {}});}

如果有人能用 RX Java 2 和 lambdas 来回答那就太好了,因为我一直在这个问题上做得不够

解决方案

这通常是在异步/回调工作与反应性之间架起桥梁的正确方法,但现在使用 Observable.create()不鼓励,因为它需要高级知识才能使其正确.
您应该使用更新的创建方法 Observable.fromEmitter(),它看起来完全一样:

 return Observable.fromEmitter(new Action1>() {@覆盖公共无效调用(发射器<整数>发射器){transObs.setTransferListener(new TransferListener() {@覆盖public void onStateChanged(int id, TransferState state) {如果(状态 == TransferState.COMPLETED)发射器.onCompleted();}@覆盖public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {}@覆盖public void onError(int id, Exception ex) {发射器.onError(ex);}});发射器.setCancellation(新可取消(){@覆盖公共无效取消()抛出异常{//处理取消订阅://1. 注销监听器以避免内存泄漏//2. 取消上传}});}}, Emitter.BackpressureMode.DROP);

这里增加的是:处理unsusbcription:取消上传,取消注册以避免内存泄漏,并指定背压策略.
您可以在此处阅读更多信息.>

补充说明:

  • 如果你对进度感兴趣,你可以在 onProgressChanged() 调用 onNext() 并把 Observable 转换为 Observable.
  • 如果没有,您可能需要考虑使用 Completable,它是 Observable 且没有 onNext() 排放,但只有 onCompleted() 这可以如果您对进度指示不感兴趣,则适合您的情况.

I'm trying to upload a file via Amazon's S3 Android SDK. I've used RX Java a bit but I'm not sure how to convert this method to a method that returns an Observable because I want to chain the result of this method to another Observable call. It confuses me I suppose because of the fact that this does not return right away and can't return until either OnError or OnState changes. How do I handle these situations in an RX way?

public void uploadFile(TransferObserver transferObserver){

    transferObserver.setTransferListener(new TransferListener() {
        @Override
        public void onStateChanged(int id, TransferState state) {

        }

        @Override
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

        }

        @Override
        public void onError(int id, Exception ex) {

        }
  });

}

If someone could answer with RX Java 2 and lambdas that would be great because I just keep coming up short on this one

解决方案

This is generally the right approach to bridge between the async/callback workd to reactive, but using Observable.create() is now discouraged, as it's requires advanced knowledge in order to make it right.
You should use more recent create method Observable.fromEmitter(), which will look quite the same:

    return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
        @Override
        public void call(Emitter<Integer> emitter) {

            transObs.setTransferListener(new TransferListener() {
                @Override
                public void onStateChanged(int id, TransferState state) {
                    if (state == TransferState.COMPLETED)
                        emitter.onCompleted();
                }

                @Override
                public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                }

                @Override
                public void onError(int id, Exception ex) {
                    emitter.onError(ex);
                }
            });
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    // Deal with unsubscription:
                    // 1. unregister the listener to avoid memory leak
                    // 2. cancel the upload 
                }
            });
        }
    }, Emitter.BackpressureMode.DROP);

What was added here is: dealing with unsusbcription: cancelling the upload, and unregistering to avoid memory leaks, and specifying backpressure strategy.
you can read more here.

additional notes:

  • if you interested with progress you can call onNext() with progress at onProgressChanged() and convert the Observable to Observable<Integer>.
  • if not, you might want to consider using Completable which is Observable with no onNext() emissions but only onCompleted() this can suits your case if your not interested with progress indications.

这篇关于Rx Java Android:如何将此回调块转换为观察者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆