Spring 反应式:混合 RestTemplate &网络客户端 [英] Spring reactive : mixing RestTemplate & WebClient

查看:60
本文介绍了Spring 反应式:混合 RestTemplate &网络客户端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个端点:/parent/child/{parentId}
我需要返回所有 Child

public class Parent {
  private long id;
  private Child child;
}

public class Child {
  private long childId;
  private String someAttribute;
}

但是,调用 /child/{parentId} 很慢,所以我尝试这样做:

However, call to /child/{parentId} is quite slow, so Im trying to do this:

  1. 调用/parent获取100个父数据,使用异步RestTemplate
  2. 对于每一个父数据,调用/child/{parentId}获取详细信息
  3. /child/{parentId}的结果调用添加到resultList
  4. 当对 /child/{parentId} 的 100 次调用完成后,返回 resultList
  1. Call /parent to get 100 parent data, using asynchronous RestTemplate
  2. For each parent data, call /child/{parentId} to get detail
  3. Add the result call to /child/{parentId} into resultList
  4. When 100 calls to /child/{parentId} is done, return resultList

我使用包装类,因为大多数端点以格式返回 JSON :

I use wrapper class since most endpoints returns JSON in format :

{
  "next": "String",
  "data": [
    // parent goes here
  ]
}

所以我把它包裹在这个

public class ResponseWrapper<T> {
  private List<T> data;
  private String next;
}

我写了这段代码,但是 resultList 总是返回空元素.实现这一目标的正确方法是什么?

I wrote this code, but the resultList always return empty elements. What is the correct way to achieve this?

public List<Child> getAllParents() {
    var endpointParent = StringUtils.join(HOST, "/parent");
    var resultList = new ArrayList<Child>();

    var responseParent = restTemplate.exchange(endpointParent, HttpMethod.GET, httpEntity,
            new ParameterizedTypeReference<ResponseWrapper<Parent>>() {
            });

    responseParent.getBody().getData().stream().forEach(parent -> {
        var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
        // async call due to slow endpoint child
        webClient.get().uri(endpointChild).retrieve()
                .bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
                }).map(wrapper -> wrapper.getData()).subscribe(children -> {
                    children.stream().forEach(child -> resultList.add(child));
                });
    });

    return resultList;
}

推荐答案

在响应式类型上调用 subscribe 开始处理但立即返回;此时您无法保证处理已完成.因此,当您的代码段调用 return resultList 时,WebClient 可能仍在忙于获取东西.

Calling subscribe on a reactive type starts the processing but returns immediately; you have no guarantee at that point that the processing is done. So by the time your snippet is calling return resultList, the WebClient is probably is still busy fetching things.

您最好放弃异步 resttemplate(现在已弃用,取而代之的是 WebClient)并构建一个单一的管道,例如:

You're better off discarding the async resttemplate (which is now deprecated in favour of WebClient) and build a single pipeline like:

public List<Child> getAllParents() {
    var endpointParent = StringUtils.join(HOST, "/parent");
    var resultList = new ArrayList<Child>();

   Flux<Parent> parents = webClient.get().uri(endpointParent)
            .retrieve().bodyToMono(ResponseWrapper.class)
            .flatMapMany(wrapper -> Flux.fromIterable(wrapper.data));

    return parents.flatMap(parent -> {
      var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
      return webClient.get().uri(endpointChild).retrieve()
                .bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
                }).flatMapMany(wrapper -> Flux.fromIterable(wrapper.getData()));
    }).collectList().block();
}

默认情况下,parents.flatMap 操作符将处理具有某种并发性的元素(我相信默认为 16).您可以通过使用选定的并发值调用 Flux.flatMap 运算符的另一个变体来选择不同的值.

By default, the parents.flatMap operator will process elements with some concurrency (16 by default I believe). You can choose a different value by calling another variant of the Flux.flatMap operator with a chosen concurrency value.

这篇关于Spring 反应式:混合 RestTemplate &amp;网络客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆