将流式流量的WebClient帖子拆分为JSON数组 [英] Splitting a WebClient Post of a Streaming Flux into JSON Arrays

查看:157
本文介绍了将流式流量的WebClient帖子拆分为JSON数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用第三方REST控制器,该控制器接受JSON对象数组并返回单个对象响应.当我使用有限的Flux从WebClient进行发布时,代码有效(我想是因为Flux完成了.)

I am using a third-party REST controller which accepts an array of JSON objects and returns a single object response. When I POST from a WebClient using a limited Flux the code works (I assume, because the Flux completes).

但是,当Flux可能不受限制时,我该怎么办;

However, when the Flux is potentially unlimited, how do I;

  1. 以大块数组形式发布?
  2. 每个POST数组捕获响应吗?
  3. 停止传输Flux吗?
  1. POST in chunks of arrays?
  2. Capture the response, per POSTed array?
  3. Stop the transmission of the Flux?

这是我的豆子

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

这是我假设第三方客户端的外观;

This is how I assume that the third-party client looks like;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

这是我的代码.当我发布flux2时,在完成时会发送一个JSON数组.但是,当我发布flux1时,第一个take(5)之后什么也没有发送.如何发布接下来的5个块?

And here is my code. When I POST flux2 , on completion a JSON array is sent. However, when I POST flux1, nothing is sent after the first take(5). How do POST the next chunks of 5?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}

推荐答案

  1. 如何在大块数组中过帐?

使用

  1. 如何根据每个POST数组捕获响应?

您可以在.exchange()之后通过运算符分析响应.

You can analyze the response via operators after .exchange().

在我提供的示例中,可以在doOnNext运算符中看到响应,但是您可以使用对onNext信号进行运算的任何运算符,例如maphandle.

In the example I provided, the response can be seen in the doOnNext operator, but you can use any operator that operates on onNext signals, such as map or handle.

请务必完整阅读响应正文,以确保将连接返回到池中(请参阅

Be sure to read the response body fully to ensure the connection is returned back to the pool (see note). Here, I have used .bodyToMono, but any .body or .toEntity method will work.

  1. 停止传递助焊剂吗?

在完成使用subscribe方法后,可以使用返回的disposable.dispose()停止流.

When using the subscribe method as you have done, you can stop the flow using the returned disposable.dispose().

或者,您可以从sendCars()方法返回Flux并将委派和处置委托给调用方.

Alternatively, you can return the Flux from the sendCars() method and delegate the subscription and disposing to the caller.

请注意,在我提供的示例中,我只是使用Thread.sleep()来模拟等待.在实际的应用程序中,您应该使用更高级的内容,并避免使用Thread.sleep()

Note that in the example I provided, I just used Thread.sleep() to simulate waiting. In a real application, you should use something more advanced, and avoid Thread.sleep()

这篇关于将流式流量的WebClient帖子拆分为JSON数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆