Rx Java Android:如何将此回调块转换为观察者 [英] Rx Java Android : How to convert this callback block to Observer
问题描述
我正在尝试通过 Amazon 的 S3 Android SDK 上传文件.我已经稍微使用过 RX Java,但我不确定如何将此方法转换为返回 Observable 的方法,因为我想将此方法的结果链接到另一个 Observable 调用.我想这让我感到困惑,因为这不会立即返回,并且在 OnError 或 OnState 更改之前无法返回.我如何以 RX 方式处理这些情况?
public void uploadFile(TransferObserver transferObserver){transferObserver.setTransferListener(new TransferListener() {@覆盖public void onStateChanged(int id, TransferState state) {}@覆盖public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {}@覆盖public void onError(int id, Exception ex) {}});}
如果有人能用 RX Java 2 和 lambdas 来回答那就太好了,因为我一直在这个问题上做得不够
这通常是在异步/回调工作与反应性之间架起桥梁的正确方法,但现在使用 Observable.create()
不鼓励,因为它需要高级知识才能使其正确.
您应该使用更新的创建方法 Observable.fromEmitter()
,它看起来完全一样:
return Observable.fromEmitter(new Action1>() {@覆盖公共无效调用(发射器<整数>发射器){transObs.setTransferListener(new TransferListener() {@覆盖public void onStateChanged(int id, TransferState state) {如果(状态 == TransferState.COMPLETED)发射器.onCompleted();}@覆盖public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {}@覆盖public void onError(int id, Exception ex) {发射器.onError(ex);}});发射器.setCancellation(新可取消(){@覆盖公共无效取消()抛出异常{//处理取消订阅://1. 注销监听器以避免内存泄漏//2. 取消上传}});}}, Emitter.BackpressureMode.DROP);
这里增加的是:处理unsusbcription:取消上传,取消注册以避免内存泄漏,并指定背压策略.
您可以在此处阅读更多信息.>
补充说明:
- 如果你对进度感兴趣,你可以在
onProgressChanged()
调用 onNext() 并把 Observable 转换为Observable
. - 如果没有,您可能需要考虑使用
Completable
,它是 Observable 且没有onNext()
排放,但只有onCompleted()
这可以如果您对进度指示不感兴趣,则适合您的情况.
I'm trying to upload a file via Amazon's S3 Android SDK. I've used RX Java a bit but I'm not sure how to convert this method to a method that returns an Observable because I want to chain the result of this method to another Observable call. It confuses me I suppose because of the fact that this does not return right away and can't return until either OnError or OnState changes. How do I handle these situations in an RX way?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
If someone could answer with RX Java 2 and lambdas that would be great because I just keep coming up short on this one
This is generally the right approach to bridge between the async/callback workd to reactive, but using Observable.create()
is now discouraged, as it's requires advanced knowledge in order to make it right.
You should use more recent create method Observable.fromEmitter()
, which will look quite the same:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
What was added here is: dealing with unsusbcription: cancelling the upload, and unregistering to avoid memory leaks, and specifying backpressure strategy.
you can read more here.
additional notes:
- if you interested with progress you can call onNext() with progress at
onProgressChanged()
and convert the Observable toObservable<Integer>
. - if not, you might want to consider using
Completable
which is Observable with noonNext()
emissions but onlyonCompleted()
this can suits your case if your not interested with progress indications.
这篇关于Rx Java Android:如何将此回调块转换为观察者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!