如何使用 rxJava 实现一系列连续操作? [英] How to implement a sequence of consecutive operations using rxJava?
问题描述
我的下载过程由 3 个连续操作组成:preProcess
、downloading
、postProcess
.每个操作都具有异步性质(preProcess
调用 API,downloading
等待文件下载等).UI 必须显示正在执行的操作(例如,正在准备……"、正在下载……"、正在解包……").我将整个过程视为发出整个操作的当前状态的 Observable
.每个操作也是一个 observable,它在执行开始时发出他的状态并在执行后完成.
I have the download process that consists of 3 consecutive operations: preProcess
, downloading
, postProcess
. Each operation has asynchronous nature (preProcess
calls API, downloading
waits file to be downloaded etc). UI have to display which operations is executing (eg. "preparing...", "downloading...", "unpacking...").
I see whole process as Observable
that emits current status of whole operation. Each operation is also an observable, that emits his status in the start of executions and completes after execution.
Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.PRE_PROCESS);
doPreProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(final Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.DOWNLOADING);
doDownloading()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.POST_PROCESS);
doPostProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
一方面,每个操作都应该等到之前的操作完成.另一方面,订阅者需要接收每个发出的状态(例如 PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)
On the one hand each operation should wait until previous operations completes. On the other hand subscriber need to receive each emitted status (eg. PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)
我不能使用 merge
因为每个操作都应该依赖于前一个操作的完成.我不能使用 flatMap
因为我不知道如何传播发出的状态.我认为 Subject
可能是解决方案,但我也不知道如何传播已发出的状态.
I cannot use merge
because each operation should depend on completion of previous one.
I cannot use flatMap
because i don't know how to propagate emitted status. I think that Subject
could be the solution, but i also don't know how to propagate emitted status.
如何使用 rxJava 解决此类问题?感谢您提供任何想法/线索.
How can I solve such problem with rxJava? Thank for any ideas/clues.
推荐答案
concat代码>
正是您所需要的.一旦前一个完成后,它就会订阅连接的可观察对象.
concat
is what you need. This subscribes to the concatenated observable once the preceding one has finished.
concatMap
也像 flatMap
一样工作,但连接扁平的投影.这里有一个很好的图表介于两者之间.
concatMap
also works like flatMap
but concatenates the flattened projections. There's a nice diagram here on the difference between those two.
这篇关于如何使用 rxJava 实现一系列连续操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!