JDBC与Webflux - 如何分派到容器线程 [英] JDBC with Webflux - how to dispatch to container thread

查看:154
本文介绍了JDBC与Webflux - 如何分派到容器线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用webflux进行小型概念验证。在我的应用程序的一部分中,我想与数据库(通过JDBC)进行通信,该数据库阻塞并且不适合反应堆。然而,对于这个概念验证,我正在考虑以下技巧:

I am working on a small proof-of-concept with webflux. In one part of my application I would like to communicate with a database (via JDBC) which is blocking and is not a good fit for reactor. Nevertheless for this proof-of-concept, I am thinking of following trick:


  1. 定义专用线程池(让我们称之为 DBThreadPool )as ExecutorService ,其中固定数量的线程等于JDBC连接池大小。在Reactor Scheduler中包装该池(名为 dbScheduller

  2. 换行阻塞调用(如Reactor doc中所述)并安排它on dbScheduller

  1. Define a dedicated thread pool (let's call it DBThreadPool) as ExecutorService with a fixed number of threads that equal JDBC connection pool size. Wrap that pool in Reactor Scheduler (named dbScheduller)
  2. Wrap blocking call (like described in Reactor doc) and schedule it on dbScheduller:

public Flux<DbUser> allUsers() {
    return Mono.fromCallable(() -> <jdbcQueryHere>)
        .flatMapIterable(Function.identity())
        .log("DB-OPER").subscribeOn(dbScheduller);
}


  • 一旦查询完成,我想处理返回的 Flux< DbUser> 与一些反应运算符但在容器线程中执行它们,以便我可以释放数据库线程。根据Reactor doc,它可以通过 publishOn 方法完成:

  • As soon as the query is done, I would like to process the returned Flux<DbUser> with some reactive operators but perform them in a container thread so I can release the DB thread. According to Reactor doc it could be done via publishOn method:

    public Mono<ServerResponse> allUsers(ServerRequest request) {
        return ServerResponse.ok()
        .contentType(APPLICATION_STREAM_JSON)
        .body(
            usersDao.allUsers()
            .publishOn(<netty Thread pool as scheduller>)
            .map(<some function>)
            .filter(<some predicate>), 
        DbUser.class);
    }
    


  • 你认为这个吗?是一种可行的方法(直到我们得到数据库的非阻塞驱动程序)?

    Do you think this is a viable approach (until we get non-blocking drivers to databases)?

    如何访问容器(netty)线程池,以便可以利用其中一个线程来完成(从DB处理后数据并写入响应)HTTP请求?

    How to get access to container (netty) thread pool so one of its threads can be utilised to finalise (post-process data from DB and write to response) a HTTP request?

    我知道我可以单独使用数据库线程(通过省略 publishOn )来完成HTTP请求,但是我想尽快释放它(因此它可以被另一个也需要访问DB的请求重用)并将剩下的工作(这可能是耗时的)留给netty托管线程(它可以是一个最初执行我的处理程序方法)。

    I know I can use DB thread by itself (by omitting publishOn) for finalising a HTTP request, but I would like to release it as soon as possible (so it can be reused by another request that also needs access to DB) and leave rest of the work (which potentially can be time consuming) to a netty managed thread (it can be the one that originally executed my handler method).

    推荐答案


    您认为这是一种可行的方法(直到我们获得数据库的非阻塞驱动程序)?

    Do you think this is a viable approach (until we get non-blocking drivers to databases)?

    我个人认为这是一种有效的方法。 许多其他人(他们中的大多数人比我更了解反应性编程)怀疑这是一个好主意。

    I personally do think this is a valid approach. BUT many others (most of them know much more about reactive programming than I do) doubt this is a good idea.

    您的替代方案:


    • 为阻塞数据库调用编写单独的服务,并使用 WebClient

    Pro:清除分离阻塞代码和非阻塞代码。

    Pro: clean separation of blocking and non-blocking code.

    Con:还有一个服务边界,包括网络。

    Con: One more service boundary including the network.

    为你的阻止写一个单独的服务数据库调用包括自己的用户界面。

    write a separate service for your blocking DB calls including its own user interface.

    专业版:没有内部服务通信。

    Pro: No inter service communication.

    Con:降低百分比您的应用程序实际上是被动的。

    Con: Reduces the percentage of your application that is actually reactive.

    如果您想在此服务中保留阻止代码,这是正确的方法。

    if you want to keep the blocking code in this service, this is the correct way to do it.


    如何访问容器(netty)线程池?

    How to get access to container (netty) thread pool?

    如果您只是渲染响应,则不需要,因为这会自动发生在netty线程上。

    If you are just rendering your response you don't need to, because that happens on the netty thread automagically.

    如果您要进行实际转换,则没有任何现成的做法。我想可以使用一些Netty API并基于此实现 Scheduler 。但我对Netty的帮助不够了解。

    If you have actual transformations to do, there is nothing ready-made to do this. I guess one could use some Netty API and implement a Scheduler based on that. But I don't know enough about Netty to help with that.

    为了回答后续问题,我有点问自己:

    And to answer the follow-up question I kind of asked myself:


    但这不是一个巨大的遗漏吗?

    But isn't that a gigantic omission?

    publishOn 方法并非真正用于此目的。它们的用例更多地用于例如Swing应用程序,你在管道的末尾有使用某个线程或线程池的要求。

    The publishOn methods weren't really intended for this. Their use-case is more along use in e.g. Swing applications where you have at the end of a pipeline the requirement to use a certain thread or thread pool.

    我个人的感觉是Reactor应该有这样的东西,但是开发人员目前不相信这是一个好主意。所以我建议如果你认为它对你的应用程序真的有意义:打开一张票并准备好争辩你的观点。要么你成功了,要么我们都知道为什么这是个坏主意: - )

    My personal feeling is Reactor should have something like this, but the developers are currently not convinced that this is a good idea. So I suggest if you think it really makes sense for your application: open a ticket and be prepared to argue your point. Either you succeed or we both learn why it is a bad idea :-)


    还有别的吗?

    Anything else?

    很高兴你问你是否在一个线程池中阻塞东西,否则你应该考虑如何在线程池(或相应的资源)处理场景)它被重载,即它处理的事件比它们生成的要慢。

    Glad you asked if you're doing blocking stuff on a thread pool in an otherwise reactive pipeline you should think how to handle the scenario when the thread pool (or the respective resource) is overloaded i.e. it processes events slower than they get produced.

    这篇关于JDBC与Webflux - 如何分派到容器线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆