如何在Rx中继续出现错误的流? [英] How to continue the stream with error in Rx?
问题描述
我正在使用Kotlin,RxJava和Retrofit开发一个Android应用程序. 我想将Http请求发送到服务器.
I am developing an Android app using Kotlin, RxJava, Retrofit. I want to send Http Request to the server.
PUT-作业的更新选项
PUT - update option of job
POST-运行作业
第一个请求成功后,我发送第二个请求.所以我用了concatMap.
After the first request success, then I send a second request. So I used concatMap.
val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST
updateJob.concatMap { runJob }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success: $job")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
但是我不知道是否有多个工作,如下所示.
But I don't know in case of multiple jobs like below.
我有工作清单. 如果一个作业的更新"请求失败,则不应发送运行"请求. 但是下一项工作应该继续. 为此,我编写如下代码.
I have a job list. If one job's "update" request is failed, "run" request should not be sent. But the next job should continue. To do this, I make a code like below.
Observable.fromIterable(jobs.toList())
.concatMap { job ->
val updateJob = restService.updateJob(token, job.id, job) // HTTP PUT Request
val runJob = restService.runJob(token, job.id) // HTTP POST Request
updateJob.concatMap { runJob }
}
.window(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
it.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.e(TAG, "run job - success")
}, {
Log.e(TAG, "run job - failed - 1: ${it.message}")
it.printStackTrace()
})
}, {
Log.e(TAG, "run job - failed - 2: ${it.message}")
it.printStackTrace()
})
我认为窗口"运算符可能是解决方案. 但这不是... 如果某些作业失败,则流将通过onError()结束. 我该如何解决这个问题?
I thought that "window" operator may be the solution. But it doesn't ... If some job is failed, the stream is over with onError(). How should I solve this problem?
推荐答案
我使用以下代码解决了此问题.
I resolve this issue using the following code.
Observable.fromIterable(jobs.toList())
.concatMap { job ->
val updateJob = restService.updateJob(token, job.id, job) // HTTP PUT Request
.onErrorResumeNext(Observable.empty<Job>()) // Solution Point
val runJob = restService.runJob(token, job.id) // HTTP POST Request
updateJob.concatMap { runJob }
}
// .window(2) I removed this line.
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.d(TAG, "run job - success")
}, {
Log.e(TAG, "run job - failed - 2: ${it.message}")
it.printStackTrace()
})
这篇关于如何在Rx中继续出现错误的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!