处理 Reactor 中的并行通量 [英] Dealing with parallel flux in Reactor

查看:53
本文介绍了处理 Reactor 中的并行通量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 iterable 创建了一个并行通量.在每个可迭代对象上,我都必须进行一次休息调用.但是在执行时,即使任何请求失败,所有剩余的请求也会失败.我希望无论失败或成功,都执行所有请求.

I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.

我目前正在使用 Flux.fromIterable 并使用 runOn 运算符

I am currently using Flux.fromIterable and using runOn operator

Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)     
.sequential()
.subscribe();

我希望 iterable 中的所有请求都被执行,无论失败或成功.但到目前为止,有些被执行了,有些失败了.

I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.

推荐答案

我通常使用三种可能的方法来实现这一点:

There's three possible ways I generally use to achieve this:

  • 使用 flatMap() 的 3 参数版本,其中第二个是 mapperOnError -eg..flatMap(request -> someRemoteCall(), x->Mono.empty(), null);
  • 使用 onErrorResume(x -> Mono.empty()) 作为单独的调用来忽略任何错误;
  • 使用 .onErrorResume(MyException.class, x -> Mono.empty())) 来忽略某种类型的错误.
  • Use the 3 argument version of flatMap(), the second of which is a mapperOnError -eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null);
  • Use onErrorResume(x -> Mono.empty()) as a separate call to ignore any error;
  • Use .onErrorResume(MyException.class, x -> Mono.empty())) to just ignore errors of a certain type.

第二个是我默认使用的,因为我觉得最清楚.

The second is what I tend to use by default, as I find that clearest.

这篇关于处理 Reactor 中的并行通量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆