如何使用 rxJava 实现一系列连续操作? [英] How to implement a sequence of consecutive operations using rxJava?

查看:33
本文介绍了如何使用 rxJava 实现一系列连续操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的下载过程由 3 个连续操作组成:preProcessdownloadingpostProcess.每个操作都具有异步性质(preProcess 调用 API,downloading 等待文件下载等).UI 必须显示正在执行的操作(例如,正在准备……"、正在下载……"、正在解包……").我将整个过程视为发出整个操作的当前状态的 Observable.每个操作也是一个 observable,它在执行开始时发出他的状态并在执行后完成.

I have the download process that consists of 3 consecutive operations: preProcess, downloading, postProcess. Each operation has asynchronous nature (preProcess calls API, downloading waits file to be downloaded etc). UI have to display which operations is executing (eg. "preparing...", "downloading...", "unpacking..."). I see whole process as Observable that emits current status of whole operation. Each operation is also an observable, that emits his status in the start of executions and completes after execution.

    Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.PRE_PROCESS);
            doPreProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(final Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.DOWNLOADING);
            doDownloading()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.POST_PROCESS);
            doPostProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

一方面,每个操作都应该等到之前的操作完成.另一方面,订阅者需要接收每个发出的状态(例如 PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)

On the one hand each operation should wait until previous operations completes. On the other hand subscriber need to receive each emitted status (eg. PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)

我不能使用 merge 因为每个操作都应该依赖于前一个操作的完成.我不能使用 flatMap 因为我不知道如何传播发出的状态.我认为 Subject 可能是解决方案,但我也不知道如何传播已发出的状态.

I cannot use merge because each operation should depend on completion of previous one. I cannot use flatMap because i don't know how to propagate emitted status. I think that Subject could be the solution, but i also don't know how to propagate emitted status.

如何使用 rxJava 解决此类问题?感谢您提供任何想法/线索.

How can I solve such problem with rxJava? Thank for any ideas/clues.

推荐答案

concat 正是您所需要的.一旦前一个完成后,它就会订阅连接的可观察对象.

concat is what you need. This subscribes to the concatenated observable once the preceding one has finished.

concatMap 也像 flatMap 一样工作,但连接扁平的投影.这里有一个很好的图表介于两者之间.

concatMap also works like flatMap but concatenates the flattened projections. There's a nice diagram here on the difference between those two.

这篇关于如何使用 rxJava 实现一系列连续操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆