等待运行 Reactor Mono 实例完成 [英] Waiting for running Reactor Mono instances to complete
本文介绍了等待运行 Reactor Mono 实例完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我写这段代码是为了剥离大量的WebClients(受限于reactor.ipc.netty.workerCount
),立即启动Mono,等待所有Monos完成:>
I wrote this code to spin off a large number of WebClients (limited by reactor.ipc.netty.workerCount
), start the Mono immediately, and wait for the all Monos to complete:
List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
for (MetricConfig metricConfig : metricConfigs) {
try {
monos.add(extractMetrics.queryMetricData(metricConfig)
.doOnSuccess(result -> {
metricDataList.addAll(result);
})
.cache());
} catch (Exception e) {
}
}
Mono.when(monos)
.doFinally(onFinally -> {
Map<String, Date> latestMap;
try {
latestMap = extractInsights.queryInsights();
Transform transform = new Transform(copierConfig.getEventType());
ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
} catch (Exception e) {
log.error("copy: mono: when: {}", e.getMessage(), e);
}
})
.block();
它有效",即结果符合预期.
It 'works', that is the results are as expected.
两个问题:
- 这是正确的吗?
cache()
是否会导致when
等待所有 Monos 完成? - 它高效吗?有没有办法让它更快?
- Is this correct? Does
cache()
result in thewhen
waiting for all Monos to complete? - Is it efficient? Is there a way to make this faster?
谢谢.
推荐答案
你应该尽可能地尝试:
- 使用 Reactor 运算符并组成单个反应链
- 避免将
doOn*
运算符用于副作用(如日志记录)以外的其他用途 - 避免共享状态
- use Reactor operators and compose a single reactive chain
- avoid using
doOn*
operators for something other than side-effects (like logging) - avoid shared state
您的代码可能看起来更像
Your code could look a bit more like
List<MetricConfig> metricConfigs = //...
Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
.flatMap(config -> extractMetrics.queryMetricData(config))
.collectList();
此外,cache()
操作符不会等待流完成(这实际上是 then()
的工作).
Also, the cache()
operator does not wait the completion of the stream (that's actually then()
's job).
这篇关于等待运行 Reactor Mono 实例完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文