等待运行 Reactor Mono 实例完成 [英] Waiting for running Reactor Mono instances to complete

查看:68
本文介绍了等待运行 Reactor Mono 实例完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写这段代码是为了剥离大量的WebClients(受限于reactor.ipc.netty.workerCount),立即启动Mono,等待所有Monos完成:

I wrote this code to spin off a large number of WebClients (limited by reactor.ipc.netty.workerCount), start the Mono immediately, and wait for the all Monos to complete:

   List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
   for (MetricConfig metricConfig : metricConfigs) {
        try {
            monos.add(extractMetrics.queryMetricData(metricConfig)
                  .doOnSuccess(result -> {
                      metricDataList.addAll(result);
                  })
                  .cache());
        } catch (Exception e) {
        }
    }

    Mono.when(monos)
          .doFinally(onFinally -> {
              Map<String, Date> latestMap;
              try {
                  latestMap = extractInsights.queryInsights();
                  Transform transform = new Transform(copierConfig.getEventType());
                  ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
              } catch (Exception e) {
                  log.error("copy: mono: when: {}", e.getMessage(), e);
              }
          })
          .block();

它有效",即结果符合预期.

It 'works', that is the results are as expected.

两个问题:

  1. 这是正确的吗?cache() 是否会导致 when 等待所有 Monos 完成?
  2. 高效吗?有没有办法让它更快?
  1. Is this correct? Does cache() result in the when waiting for all Monos to complete?
  2. Is it efficient? Is there a way to make this faster?

谢谢.

推荐答案

你应该尽可能地尝试:

  • 使用 Reactor 运算符并组成单个反应链
  • 避免将 doOn* 运算符用于副作用(如日志记录)以外的其他用途
  • 避免共享状态
  • use Reactor operators and compose a single reactive chain
  • avoid using doOn* operators for something other than side-effects (like logging)
  • avoid shared state

您的代码可能看起来更像

Your code could look a bit more like

List<MetricConfig> metricConfigs = //...
Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
    .flatMap(config -> extractMetrics.queryMetricData(config))
    .collectList();

此外,cache() 操作符不会等待流完成(这实际上是 then() 的工作).

Also, the cache() operator does not wait the completion of the stream (that's actually then()'s job).

这篇关于等待运行 Reactor Mono 实例完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆