在项目反应堆中包装阻塞I / O. [英] Wrapping blocking I/O in project reactor

查看:118
本文介绍了在项目反应堆中包装阻塞I / O.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spring-webflux API,它在服务层需要从使用JDBC的现有存储库中读取。

I have a spring-webflux API which, at a service layer, needs to read from an existing repository which uses JDBC.

对这个主题做了一些阅读,我想将阻塞数据库调用的执行与其他非阻塞异步代码分开。

Having done some reading on the subject, I would like to keep the execution of the blocking database call separate from the rest of my non-blocking async code.

我已经定义了一个专用的jdbcScheduler:

I have defined a dedicated jdbcScheduler:

@Bean
public Scheduler jdbcScheduler() {
    return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}

以及使用它的AsyncWrapper实用程序:

And an AsyncWrapper utility to use it:

@Component
public class AsyncJdbcWrapper {

    private final Scheduler jdbcScheduler;

    @Autowired
    public AsyncJdbcWrapper(Scheduler jdbcScheduler) {

        this.jdbcScheduler = jdbcScheduler;
    }

    public <T> Mono<T> async(Callable<T> callable) {
        return Mono.fromCallable(callable)
                .subscribeOn(jdbcScheduler)
                .publishOn(Schedulers.parallel());
    }
}

然后用于包装jdbc调用,如下所示:

Which is then used to wrap jdbc calls like so:

Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
                .map(userOption -> userOption.map(u -> u.getId())
                        .orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));

我有两个问题:

1)我是否正确地将阻塞调用的执行推送到另一组线程?对于这些东西相当新,我正在努力解决subscribeOn()/ publishOn()的错综复杂问题。

1) Am I correctly pushing the execution of blocking calls to another set of threads? Being fairly new to this stuff I'm struggling with the intricacies of subscribeOn()/publishOn().

2)说我想利用生成的单声道,例如,使用userIdMono的结果调用API,在哪个调度程序上执行?专门为jdbc调用创建的那个,或者反应堆通常在其中运行的主(?)线程?例如。

2) Say I want to make use of the resulting mono, e.g call an API with the result of the userIdMono, on which scheduler will that be executed? The one specifically created for the jdbc calls, or the main(?) thread that reactor usually operates within? e.g.

userIdMono.map(id -> someApiClient.call(id));


推荐答案

1)使用 subscribeOn 正确地将JDBC工作放在 jdbcScheduler

1) Use of subscribeOn is correctly putting the JDBC work on the jdbcScheduler

2)结果 Callable - 在jdbcScheduler上计算时,是 publishOn parallel Scheduler,所以你的 map 将在 Schedulers.parallel()池的一个线程上执行(而不是而不是占用 jdbcScheduler )。

2) Neither, the results of the Callable - while computed on the jdbcScheduler, are publishOn the parallel Scheduler, so your map will be executed on a thread from the Schedulers.parallel() pool (rather than hogging the jdbcScheduler).

这篇关于在项目反应堆中包装阻塞I / O.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆