如何从rxjava平面图中调用协程用例 [英] How to call a coroutine usecase from a rxjava flat map
问题描述
我有一个rxjava平面地图,我想在其中调用api调用的协程用例onStandUseCase
Hi I have a rxjava flat map in which I want to call a coroutine usecase onStandUseCase which is an api call
最初,用例也是基于rxjava的,它用于返回 Observable< GenericResponse>
,并且运行良好现在,我将用法更改为基于协程,它只返回 GenericResponse
Initially the use case was also rxjava based and it used to return Observable<GenericResponse>
and it worked fine
now that I changed the use to be coroutines based it only returns GenericResponse
请如何修改平面图以与协同程序用例配合使用
how can modify the flatmap to work fine with coroutines use case please
subscriptions += view.startFuellingObservable
.onBackpressureLatest()
.doOnNext { view.showLoader(false) }
.flatMap {
if (!hasOpenInopIncidents()) {
//THIS IS WHERE THE ERROR IS IT RETURNS GENERICRESPONSE
onStandUseCase(OnStandUseCase.Params("1", "2", TimestampedAction("1", "2", DateTime.now()))) {
}
} else {
val incidentOpenResponse = GenericResponse(false)
incidentOpenResponse.error = OPEN_INCIDENTS
Observable.just(incidentOpenResponse)
}
}
.subscribe(
{ handleStartFuellingClicked(view, it) },
{ onStartFuellingError(view) }
)
OnStandUseCase.kt
OnStandUseCase.kt
class OnStandUseCase @Inject constructor(
private val orderRepository: OrderRepository,
private val serviceOrderTypeProvider: ServiceOrderTypeProvider
) : UseCaseCoroutine<GenericResponse, OnStandUseCase.Params>() {
override suspend fun run(params: Params) = orderRepository.notifyOnStand(
serviceOrderTypeProvider.apiPathFor(params.serviceType),
params.id,
params.action
)
data class Params(val serviceType: String, val id: String, val action: TimestampedAction)
}
UseCaseCoroutine
UseCaseCoroutine
abstract class UseCaseCoroutine<out Type, in Params> where Type : Any {
abstract suspend fun run(params: Params): Type
operator fun invoke(params: Params, onResult: (type: Type) -> Unit = {}) {
val job = GlobalScope.async(Dispatchers.IO) { run(params) }
GlobalScope.launch(Dispatchers.Main) { onResult(job.await()) }
}
}
startFuellingObservable是
startFuellingObservable is
val startFuellingObservable: Observable<Void>
这是错误的图片
关于如何解决此问题的任何建议
Any suggestion on how to fix this please
先谢谢了R
推荐答案
有集成库链接RxJava和Kotlin协程.
There is the integration library linking RxJava and Kotlin coroutines.
rxSingle
can be used to turn a suspend function into a Single
. OP wants an Observable
, so we can call toObservable()
for the conversion.
.flatMap {
if (!hasOpenInopIncidents()) {
rxSingle {
callYourSuspendFunction()
}.toObservable()
} else {
val incidentOpenResponse = GenericResponse(false)
incidentOpenResponse.error = OPEN_INCIDENTS
Observable.just(incidentOpenResponse)
}
}
请注意,两个分支中的 Observable
都只包含一个元素.我们可以使用
Note that the Observable
s in both branches contain just one element. We can make this fact more obvious by using Observable#concatMapSingle.
.concatMapSingle {
if (!hasOpenInopIncidents()) {
rxSingle { callYourSuspendFunction() }
} else {
val incidentOpenResponse = GenericResponse(false)
incidentOpenResponse.error = OPEN_INCIDENTS
Single.just(incidentOpenResponse)
}
}
这篇关于如何从rxjava平面图中调用协程用例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!