处理 Reactor 中的并行通量 [英] Dealing with parallel flux in Reactor
问题描述
我从 iterable 创建了一个并行通量.在每个可迭代对象上,我都必须进行一次休息调用.但是在执行时,即使任何请求失败,所有剩余的请求也会失败.我希望无论失败或成功,都执行所有请求.
I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.
我目前正在使用 Flux.fromIterable 并使用 runOn 运算符
I am currently using Flux.fromIterable and using runOn operator
Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)
.sequential()
.subscribe();
我希望 iterable 中的所有请求都被执行,无论失败或成功.但到目前为止,有些被执行了,有些失败了.
I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.
推荐答案
我通常使用三种可能的方法来实现这一点:
There's three possible ways I generally use to achieve this:
- 使用
flatMap()
的 3 参数版本,其中第二个是mapperOnError
-eg..flatMap(request -> someRemoteCall(), x->Mono.empty(), null)
; - 使用
onErrorResume(x -> Mono.empty())
作为单独的调用来忽略任何错误; - 使用
.onErrorResume(MyException.class, x -> Mono.empty()))
来忽略某种类型的错误.
- Use the 3 argument version of
flatMap()
, the second of which is amapperOnError
-eg..flatMap(request -> someRemoteCall(), x->Mono.empty(), null)
; - Use
onErrorResume(x -> Mono.empty())
as a separate call to ignore any error; - Use
.onErrorResume(MyException.class, x -> Mono.empty()))
to just ignore errors of a certain type.
第二个是我默认使用的,因为我觉得最清楚.
The second is what I tend to use by default, as I find that clearest.
这篇关于处理 Reactor 中的并行通量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!