响应式编程:Spring WebFlux:如何构建微服务调用链? [英] Reactive Programming: Spring WebFlux: How to build a chain of micro-service calls?

查看:120
本文介绍了响应式编程:Spring WebFlux:如何构建微服务调用链?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spring Boot 应用:

a @RestController 接收以下有效负载:

a @RestController receives the following payload:

{
  "cartoon": "The Little Mermaid",
  "characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}

我需要按照以下方式处理:

I need to process it in the following way:

  1. 获取每个角色名称的唯一 ID:对cartoon-characters"微服务进行 HTTP 调用,该微服务按名称返回 id
  2. 转换控制器接收到的数据:用上一步从卡通字符"微服务接收到的适当 ID 替换字符名称.<代码>{《卡通》:《小美人鱼》,"characterIds": [1, 2, 3, 4]}

使用转换后的数据向cartoon-db"微服务发送 HTTP POST 请求.

Send an HTTP POST request to "cartoon-db" microservice with transformed data.

我遇到的问题:

我需要使用 Reactive Programming(非阻塞\异步处理)和 Spring WebFlux(Mono|Flux) 和 Spring Reactive WebClient - 但我对该堆栈的经验为零,试图尽可能多地阅读它,加上谷歌搜索,但仍然有一堆悬而未决的问题,例如:

I need to implement all these steps using the paradigm of Reactive Programming (non-blocking\async processing) with Spring WebFlux (Mono|Flux) and Spring Reactive WebClient - but I have zero experience with that stack, trying to read about it as much as I can, plus googling a lot but still, have a bunch of unanswered questions, for example:

第一季度.我已经配置了向卡通人物"微服务发送请求的反应式 webClient:

Q1. I have already configured reactive webClient that sends a request to "cartoon-characters" microservice:

      public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
    return WebClient.builder().baseUrl("http://cartoon-characters").build()
        .get()
        .uri("/character/{characterName}", characterName)
        .retrieve()
        .bodyToMono(Integer.class);
  }

如您所见,我有一个卡通人物名字列表,对于每个名字我都需要调用 getCartoonCharacterIdbyName(String name) 方法,我不确定调用的正确选项它是串联的,相信正确的选择:并行执行.

As you may see, I have got a list of cartoon character names and for each of them I need to call getCartoonCharacterIdbyName(String name) method, I am not sure that the right option to call it in series, believe the right option: parallel execution.

写了如下方法:

  public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
    .flatMap(this::getCartoonCharacterIdbyName);

return StreamSupport.stream(flux.toIterable().spliterator(), false)
    .collect(Collectors.toList());

}

但我怀疑这段代码会并行 WebClient 执行,而且代码调用 flux.toIterable() 会阻塞线程,所以我失去了这个实现非阻塞机制.

but I have doubts, that this code does parallel WebClient execution and also, code calls flux.toIterable() that block the thread, so with this implementation I lost non-blocking mechanism.

我的假设是否正确?

如何将其重写为具有并行性和非阻塞性?

How do I need to rewrite it to having parallelism and non-blocking?

第二季度.在技​​术上是否可以以响应式方式转换控制器接收到的输入数据(我的意思是用 id 替换名称):当我们使用 Flux characterIds 操作时,而不是使用 List<;characterIds 的整数>?

Q2. Is it technically possible to transform input data received by the controller (I mean replace names with ids) in reactive style: when we operate with Flux<Integer> characterIds, but not with the List<Integer> of characterIds?

Q3. 是否有可能在 步骤 2 之后不仅获得转换后的 Data 对象,还获得 Mono<>,可以在 Step 中被另一个 WebClient 使用3?

Q3. Is it potentially possible to get not just transformed Data object, but Mono<> after step 2 that can be consumed by another WebClient in Step 3?

推荐答案

实际上,这是一个很好的问题,因为在链接微服务时,理解 WebFlux 或项目反应器框架需要几个步骤.

Actually it's a good question since understanding the WebFlux, or project reactor framework, when it comes to chaining micro-services requires a couple of steps.

首先要意识到 WebClient 应该接收一个发布者并返回一个发布者.将此推断为 4 个不同的方法签名以帮助思考.

The first is to realize that a WebClient should take a publisher in and return a publisher. Extrapolate this to 4 different method signatures to help with thinking.

  • 单声道 -> 单声道
  • 通量 -> 通量
  • 单声道 -> 通量
  • 通量 -> 单声道

当然,在所有情况下,它只是发布者->发布者,但在您更好地理解之前保留它.前两个是显而易见的,您只需使用 .map(...) 来处理流中的对象,但您需要学习如何处理后两个.如上所述,从 Flux->Mono 可以使用 .collectList().reduce(...) 完成.从 Mono->Flux 开始,似乎通常使用 .flatMapMany.flatMapIterable 或它的一些变体来完成.可能还有其他技术.你永远不应该在任何 WebFlux 代码中使用 .block() ,如果你尝试这样做,通常你会得到一个运行时错误.

For sure, in all cases, it is just Publisher->Publisher, but leave that until you understand things better. The first two are obvious, and you just use .map(...) to handle objects in the flow, but you need to learn how to handle the second two. As commented above, going from Flux->Mono could be done with .collectList(), or also with .reduce(...). Going from Mono->Flux seems to generally be done with .flatMapMany or .flatMapIterable or some variation of that. There are probably other techniques. You should never use .block() in any WebFlux code, and generally you will get a runtime error if you try to do so.

在你的例子中你想要去

  • (Mono->Flux)->(Flux->Flux)->(Flux->Flux)

正如你所说,你想要

  • 单声道->通量->通量

第二部分是了解链接流.你可以这样做

The second part is to understand about chaining Flows. You could do

  • p3(p2(p1(object)));

哪个会链接 p1->p2->p3,但我总是发现创建一个服务层"更容易理解.

Which would chain p1->p2->p3, but I always found it more understandable to make a "Service Layer" instead.

  • o2 = p1(object);
  • o3 = p2(o2);
  • 结果 = p3(o3);

这段代码更易于阅读和维护,并且随着一些成熟度,您会理解该语句的价值.

This code is just much easier to read and maintain and, with some maturity, you come to understand the worth of that statement.

我在你的例子中遇到的唯一问题是用 WebClient 作为 @RequestBody 做一个 Flux.不起作用.请参阅 WebClient bodyToFlux(String.class) 以获取字符串列表不分隔单个值.除此之外,这是一个非常简单的应用程序.当你调试它时,你会发现它在到达 Flux 之前到达了 .subscribe(System.out::println) 行.ids = mapNamesToIds(fn) 行.这是因为 Flow 在执行之前就已经设置好了.理解这一点需要一段时间,但这是项目反应器框架的重点.

The only problem I had with your example was doing a Flux<String> with WebClient as a @RequestBody. Doesn't work. See WebClient bodyToFlux(String.class) for string list doesn't separate individual values. Other than that, it's a pretty straightforward application. You'll find when you debug it that it gets to the .subscribe(System.out::println) line before it gets to the Flux<Integer> ids = mapNamesToIds(fn) line. This is because the Flow is setup before it is executed. Takes a while to understand this but it is the point of the project reactor framework.

@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    Map<Integer, CartoonCharacter> characters;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
        characters = Arrays.asList( new CartoonCharacter[] {
                new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
                new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
                new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
                new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
        )
        .stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
        // TODO Auto-generated method stub
        CartoonRequest cr = CartoonRequest.builder()
        .cartoon("The Little Mermaid")
        .characterNames(Arrays.asList(names))
        .build();
        thisLocalClient
            .post()
            .uri("cartoonDetails")
            .body(Mono.just(cr), CartoonRequest.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class)
            .subscribe(System.out::println);
    }

    @Bean
    WebClient localClient() {
        return WebClient.create("http://localhost:8080/demo/");
    }

    @Autowired
    WebClient thisLocalClient;

    @PostMapping("cartoonDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
        Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
        Flux<Integer> ids = mapNamesToIds(fn);
        Flux<CartoonCharacter> details = mapIdsToDetails(ids);
        return details;
    }
    //  Service Layer Methods
    private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
        return thisLocalClient
            .post()
            .uri("findIds")
            .body(names, StringWrapper.class)
            .retrieve()
            .bodyToFlux(Integer.class);
    }
    private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
        return thisLocalClient
            .post()
            .uri("findDetails")
            .body(ids, Integer.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class);
    }
    // Services
    @PostMapping("findIds")
    Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
        return names.map(name->name.getString().hashCode());
    }
    @PostMapping("findDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
        return ids.map(characters::get);
    }
}

还有:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
    private String string;
}
@Data
@Builder
public class CartoonRequest {
    private String cartoon;
    private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
    Integer id;
    String name;
    String species;
}

这篇关于响应式编程:Spring WebFlux:如何构建微服务调用链?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆