Rx Java Android:如何将此回调块转换为Observer [英] Rx Java Android : How to convert this callback block to Observer
问题描述
我正在尝试通过Amazon的S3 Android SDK上传文件。我曾经使用过RX Java,但是我不确定如何将该方法转换为返回Observable的方法,因为我想将此方法的结果链接到另一个Observable调用。我猜想这让我感到困惑,因为它不会立即返回,并且只有在OnError或OnState发生更改后才能返回。如何通过RX处理这些情况?
I'm trying to upload a file via Amazon's S3 Android SDK. I've used RX Java a bit but I'm not sure how to convert this method to a method that returns an Observable because I want to chain the result of this method to another Observable call. It confuses me I suppose because of the fact that this does not return right away and can't return until either OnError or OnState changes. How do I handle these situations in an RX way?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
如果有人可以用RX Java 2和lambda回答,太好了,因为我只是对此一无所知。
If someone could answer with RX Java 2 and lambdas that would be great because I just keep coming up short on this one
推荐答案
通常,这是在异步/回调之间架起桥梁的正确方法可以进行反应,但是现在不建议使用 Observable.create()
,因为它需要高级知识才能正确使用。
应该使用较新的创建方法 Observable.fromEmitter()
,其外观将完全相同:
This is generally the right approach to bridge between the async/callback workd to reactive, but using Observable.create()
is now discouraged, as it's requires advanced knowledge in order to make it right.
You should use more recent create method Observable.fromEmitter()
, which will look quite the same:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
此处添加的内容是:处理取消抄写:取消上传,并注销以避免内存泄漏,并指定反压策略。
您可以阅读更多此处。
What was added here is: dealing with unsusbcription: cancelling the upload, and unregistering to avoid memory leaks, and specifying backpressure strategy.
you can read more here.
附加说明:
- 如果您对进度感兴趣,则可以在进度
onProgressChanged()
处调用onNext()并将Observable转换为Observable< Integer>
。 - 如果不是,您可能要考虑使用
Completable
,该值在没有onNext( )
排放,但只有onCompleted()
可以满足您的情况,如果您对进度指示不感兴趣。
- if you interested with progress you can call onNext() with progress at
onProgressChanged()
and convert the Observable toObservable<Integer>
. - if not, you might want to consider using
Completable
which is Observable with noonNext()
emissions but onlyonCompleted()
this can suits your case if your not interested with progress indications.
这篇关于Rx Java Android:如何将此回调块转换为Observer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!