Rx Java Android:如何将此回调块转换为Observer [英] Rx Java Android : How to convert this callback block to Observer

查看:129
本文介绍了Rx Java Android:如何将此回调块转换为Observer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过Amazon的S3 Android SDK上传文件。我曾经使用过RX Java,但是我不确定如何将该方法转换为返回Observable的方法,因为我想将此方法的结果链接到另一个Observable调用。我猜想这让我感到困惑,因为它不会立即返回,并且只有在OnError或OnState发生更改后才能返回。如何通过RX处理这些情况?

I'm trying to upload a file via Amazon's S3 Android SDK. I've used RX Java a bit but I'm not sure how to convert this method to a method that returns an Observable because I want to chain the result of this method to another Observable call. It confuses me I suppose because of the fact that this does not return right away and can't return until either OnError or OnState changes. How do I handle these situations in an RX way?

public void uploadFile(TransferObserver transferObserver){

    transferObserver.setTransferListener(new TransferListener() {
        @Override
        public void onStateChanged(int id, TransferState state) {

        }

        @Override
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

        }

        @Override
        public void onError(int id, Exception ex) {

        }
  });

}

如果有人可以用RX Java 2和lambda回答,太好了,因为我只是对此一无所知。

If someone could answer with RX Java 2 and lambdas that would be great because I just keep coming up short on this one

推荐答案

通常,这是在异步/回调之间架起桥梁的正确方法可以进行反应,但是现在不建议使用 Observable.create(),因为它需要高级知识才能正确使用。

应该使用较新的创建方法 Observable.fromEmitter(),其外观将完全相同:

This is generally the right approach to bridge between the async/callback workd to reactive, but using Observable.create() is now discouraged, as it's requires advanced knowledge in order to make it right.
You should use more recent create method Observable.fromEmitter(), which will look quite the same:

    return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
        @Override
        public void call(Emitter<Integer> emitter) {

            transObs.setTransferListener(new TransferListener() {
                @Override
                public void onStateChanged(int id, TransferState state) {
                    if (state == TransferState.COMPLETED)
                        emitter.onCompleted();
                }

                @Override
                public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                }

                @Override
                public void onError(int id, Exception ex) {
                    emitter.onError(ex);
                }
            });
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    // Deal with unsubscription:
                    // 1. unregister the listener to avoid memory leak
                    // 2. cancel the upload 
                }
            });
        }
    }, Emitter.BackpressureMode.DROP);

此处添加的内容是:处理取消抄写:取消上传,并注销以避免内存泄漏,并指定反压策略。

您可以阅读更多此处

What was added here is: dealing with unsusbcription: cancelling the upload, and unregistering to avoid memory leaks, and specifying backpressure strategy.
you can read more here.

附加说明:


  • 如果您对进度感兴趣,则可以在进度 onProgressChanged()处调用onNext()并将Observable转换为 Observable< Integer>

  • 如果不是,您可能要考虑使用 Completable ,该值在没有 onNext( )排放,但只有 onCompleted()可以满足您的情况,如果您对进度指示不感兴趣。

  • if you interested with progress you can call onNext() with progress at onProgressChanged() and convert the Observable to Observable<Integer>.
  • if not, you might want to consider using Completable which is Observable with no onNext() emissions but only onCompleted() this can suits your case if your not interested with progress indications.

这篇关于Rx Java Android:如何将此回调块转换为Observer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆