如何限制活动的Spring WebClient调用的数量 [英] How to limit the number of active Spring WebClient calls

查看:204
本文介绍了如何限制活动的Spring WebClient调用的数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要求,我使用Spring Batch从SQL DB中读取一堆行(数千),并在写入Kafka主题之前调用REST服务来丰富内容。

I have a requirement where I read a bunch of rows (thousands) from a SQL DB using Spring Batch and call a REST Service to enrich content before writing them on a Kafka topic.

使用Spring Reactive webClient时,如何限制活动非阻塞服务调用的数量?在使用Spring Batch读取数据后,我是否应该以某种方式在循环中引入Flux?

When using the Spring Reactive webClient, how do I limit the number of active non-blocking service calls? Should I somehow introduce a Flux in the loop after I read data using Spring Batch?

(我理解delayElements的用法,它用于不同的目的,就像单个获取服务调用会带来大量数据,并且您希望服务器速度变慢 - 尽管如此,我的用例有点不同,因为我有许多WebClient调用,并希望限制调用次数以避免出现内存问题,但仍然获得非阻塞调用的优势)。

(I understand the usage of delayElements and that it serves a different purpose, as when a single Get Service Call brings in lot of data and you want the server to slow down -- here though, my use case is a bit different in that I have many WebClient calls to make and would like to limit the number of calls to avoid out of memory issues but still gain the advantages of non-blocking invocations).

推荐答案

非常有趣的问题。我思考它,并想到了如何做到这一点的几个想法。我将分享我的想法,希望这里有一些想法可能会帮助你进行调查。

Very interesting question. I pondered about it and I thought of a couple of ideas on how this could be done. I will share my thoughts on it and hopefully there are some ideas here that perhaps help you with your investigation.

不幸的是,我不熟悉Spring Batch。但是,这听起来像速率限制或经典生产者 - 消费者问题

Unfortunately, I'm not familiar with Spring Batch. However, this sounds like a problem of rate limiting, or the classical producer-consumer problem.

所以,我们有一个生产者,它产生了很多消息,我们的消费者无法赶上,而中间的缓冲变得无法忍受。

So, we have a producer that produces so many messages that our consumer cannot catch up, and the buffering in the middle becomes unbearable.

我看到的问题是你的春天正如您所描述的那样,批处理不是作为流或管道工作,而是您的反应式Web客户端。

The problem I see is that your Spring Batch process, as you describe it, is not working as a stream or pipeline, but your reactive Web client is.

因此,如果我们能够将数据读取为一个流,然后当记录开始进入管道时,那些将被反应式Web客户端处理,并且使用反压,我们可以控制来自生产者/数据库端的流的流量。

So, if we were able to read the data as a stream, then as records start getting into the pipeline those would get processed by the reactive web client and, using back-pressure, we could control the flow of the stream from producer/database side.

制片人

所以,我要改变的第一件事是记录是如何得到的从数据库中删除。我们需要通过分页我们的数据检索或控制获取大小然后,通过反压,控制通过反应管道向下游发送的数量。

So, the first thing I would change is how records get extracted from the database. We need to control how many records get read from the database at the time, either by paging our data retrieval or by controlling the fetch size and then, with back pressure, control how many of those are sent downstream through the reactive pipeline.

因此,请考虑以下(基本)数据库数据检索,包含在 Flux 中。

So, consider the following (rudimentary) database data retrieval, wrapped in a Flux.

Flux<String> getData(DataSource ds)  {
    return Flux.create(sink -> {
        try {
            Connection con = ds.getConnection();
            con.setAutoCommit(false);
            PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
            stm.setFetchSize(1000);
            ResultSet rs = stm.executeQuery();

            sink.onRequest(batchSize -> {
                try {
                    for (int i = 0; i < batchSize; i++) {
                        if (!rs.next()) {
                            //no more data, close resources!
                            rs.close();
                            stm.close();
                            con.close();
                            sink.complete();
                            break;
                        }
                        sink.next(rs.getString(1));
                    }
                } catch (SQLException e) {
                    //TODO: close resources here
                    sink.error(e);
                }
            });
        }
        catch (SQLException e) {
            //TODO: close resources here
            sink.error(e);
        }
    });
}

在上面的示例中:


  • 通过设置提取大小,我将每批读取的记录数控制为1000.

  • 接收器将发送请求的记录数量由订户(即 batchSize )然后等待它使用背压请求更多。

  • 当没有更多记录时结果集,然后我们完成接收器并关闭资源。

  • 如果在任何时候发生错误,我们会发回错误并关闭资源。

  • 或者我可以使用分页来读取数据,可能需要在每个请求周期重新发出一个查询来简化资源的处理。

  • 如果订阅是取消或处置( sink.onCancel sink.onDispose ),因为关闭连接和其他资源是至关重要的。

  • I control the amount of records we read per batch to be 1000 by setting a fetch size.
  • The sink will send the amount of records requested by the subscriber (i.e. batchSize) and then wait for it to request more using back pressure.
  • When there are no more records in the result set, then we complete the sink and close resources.
  • If an error occurs at any point, we send back the error and close resources.
  • Alternatively I could have used paging to read the data, probably simplifying the handling of resources by having to reissue a query at every request cycle.
  • You may consider also doing something if subscription is cancelled or disposed (sink.onCancel, sink.onDispose) since closing the connection and other resources is fundamental here.

消费者方

在消费者方面,您注册的订阅者当时仅以1000的速度请求消息,并且只有在处理该批次后才会请求更多消息。

At the consumer side you register a subscriber that only requests messages at a speed of 1000 at the time and it will only request more once it has processed that batch.

getData(source).subscribe(new BaseSubscriber<String>() {

    private int messages = 0;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        subscription.request(1000);
    }

    @Override
    protected void hookOnNext(String value) {
        //make http request
        System.out.println(value);
        messages++;
        if(messages % 1000 == 0) {
            //when we're done with a batch
            //then we're ready to request for more
            upstream().request(1000);
        }
    }
});

在上面的示例中,订阅开始时它会请求第一批1000条消息。在 onNext 中,我们处理第一批,使用Web客户端发出http请求。

In the example above, when subscription starts it requests the first batch of 1000 messages. In the onNext we process that first batch, making http requests using the Web client.

批次完成后,然后我们从发布商处请求另一批1000,依此类推。

Once the batch is complete, then we request another batch of 1000 from the publisher, and so on and so on.

你有它!使用背压可以控制当时有多少打开的HTTP请求。

And there your have it! Using back pressure you control how many open HTTP requests you have at the time.

我的示例非常简陋,需要一些额外的工作才能使生产准备就绪,但是我相信这有望提供一些可以适应您的Spring Batch场景的想法。

My example is very rudimentary and it will require some extra work to make it production ready, but I believe this hopefully offers some ideas that can be adapted to your Spring Batch scenario.

这篇关于如何限制活动的Spring WebClient调用的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆