在 Spring Webflux 中执行阻塞 JDBC 调用 [英] Execute blocking JDBC call in Spring Webflux

查看:148
本文介绍了在 Spring Webflux 中执行阻塞 JDBC 调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将 Spring Webflux 与 Spring 数据 jpa 一起使用,使用 PostgreSql 作为后端数据库.我不想在进行诸如 findsave 之类的数据库调用时阻塞主线程.为了实现相同的目的,我在 Controller 类和 jdbcScheduler 服务类中有一个主调度程序.

I am using Spring Webflux with Spring data jpa using PostgreSql as backend db. I don't want to block the main thread while making db calls like find and save. To achieve the same, I have a main scheduler in Controller class and a jdbcScheduler service classes.

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

现在,在我的服务层中执行 get/save 调用时:

Now, while doing a get/save call in my service layer I do:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

在控制器中,我这样做:

And in controller, I do:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

这是正确的吗?和/或有更好的方法来做到这一点?

Is this correct? and/or there is a way better way to do it?

我的理解:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

是该任务将在 jdbcScheduler 线程上运行,随后的结果将发布在我的主并行 scheduler 上.这种理解是否正确?

is that task will run on jdbcScheduler threads and later result will be published on my main parallel scheduler. Is this understanding correct?

推荐答案

您对 publishOnsubscribeOn (请参阅反应堆项目中有关这些运算符的参考文档).

如果您调用阻塞库而不调度它在特定调度程序上工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),并且您的应用程序将只能同时处理少数请求.

If you call blocking libraries without scheduling that work on a specific scheduler, those calls will block one of the few threads available (by default, the Netty event loop) and your application will only be able to serve a few requests concurrently.

现在我不确定你这样做的目的是什么.

Now I'm not sure what you're trying to achieve by doing that.

首先,并行调度器是为 CPU 密集型任务设计的,意思是您将拥有的数量很少,与 CPU 内核一样多(或更多).在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的核心数.您的应用将无法处理大量并发请求.

First, the parallel scheduler is designed for CPU bound tasks, meaning you'll have few of them, as many (or a bit more) as CPU cores. In this case, it's like setting your threadpool size to the number of cores on a regular Servlet container. Your app won't be able to process a large number of concurrent requests.

即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是 Spring WebFlux 中本地调度请求处理的地方.

Even if you choose a better alternative (like the elastic Scheduler), it will be still not as good as the Netty event loop, which is where request processing is scheduled natively in Spring WebFlux.

如果您的最终目标是性能和可扩展性,那么在反应式应用程序中包装阻塞调用的性能可能比您的常规 Servlet 容器差.

If your ultimate goal is performance and scalability, wrapping blocking calls in a reactive app is likely to perform worse than your regular Servlet container.

您可以改为使用 Spring MVC 和:

You could instead use Spring MVC and:

  • 在处理阻塞库(如 JPA)时使用通常的阻塞返回类型
  • 当您不依赖于此类库时,请使用 MonoFlux 返回类型
  • use usual blocking return types when you're dealing with a blocking library, like JPA
  • use Mono and Flux return types when you're not tied to such libraries

这不会是非阻塞的,但这仍然是异步的,您将能够并行完成更多工作而无需处理复杂性.

This won't be non-blocking, but this will be asynchronous still and you'll be able to do more work in parallel without dealing with the complexity.

这篇关于在 Spring Webflux 中执行阻塞 JDBC 调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆