异步RestAPIs与RxJava / Jersey2。线程的问题? [英] Asynchronous RestAPIs with RxJava/Jersey2. Threading questions?

查看:1172
本文介绍了异步RestAPIs与RxJava / Jersey2。线程的问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在使用反应式编程原型设计REST API的过程。
如上图所示,我们保持3层同我们在previouse同步API设计中使用;


http://oi59.tinypic.com/339hhki.jpg


    使用Jersey2将处理请求/反序列化JSON并移交给业务层实施
  1. API层。

  2. 服务层,它实现了业务logic.Implemented采用反应式编程(RxJava)

  3. 这是用于维修服务Layer.Since我们使用CouchBase,这将使用CouchBase RxClient持久化操作DAO层。

我的理解的流程如下:

HTTP请求进来,Jersery将处理该请求/从容器线程池解析RequestThread内JSON /反序列化的请求模式。

B)随着Jersey2异步支持,RequestThread将回返回容器线程池,和业务层将在Schedulers.computation执行()调度。

  @Path(/资源)
公共类AsyncUserResource {
@GET
公共无效asyncGet(@Suspended最终AsyncResponse asyncResponse){       观察到的<使用者>用户= userService.getUser(...); //这是使用Schedulers.computation()内的服务实现执行
       user.subscribe(新观察<使用者>(){            @覆盖
            公共无效onCompleted(){            }            @覆盖
            公共无效onerror的(Throwable的E){
                使用ExceptionMappers //处理错误            }            @覆盖
            公共无效onNext(用户用户){
               asyncResponse.resume(用户);            }});
    }
}

C)里面的DAO任何IO操作将使用Schedulers.io()在一个单独的线程中运行这些长期处理操作。

我的问题是:


  1. 在当前实施的DAO /服务,我应该隐藏实现内部schedulars使用(线程)。

如道:

 公共接口的UserDAO {
  公众可观察到的<使用者>的getUser();
}

在执行,难道是很好的做法,指定程序器如下;

 公众可观察到的<使用者>的getUser(){        观察到的<使用者> RET = Observable.create((订户) - > {
            尝试{                 //做DB调用
                 用户u = NULL;
                 subscriber.onNext(U);
                 subscriber.onCompleted();            }赶上(例外五){
                subscriber.onError(E);
            }        });
        返回ret.subscribeOn(Schedulers.io());
}

还是更简单地返回观测和上层将使用particualr程序器相应的?

<醇开始=2>
  • 由于大部分的DAO IO involes /网络电话我假设Schedulars.io()应该被使用。
    如何在一侧的业务逻辑服务层?如果他们内部Schedulers.computation()(事件循环)?被执行


  • 有在JVM.One内的两个线程池是容器线程池,另一个是RxThread池由Schedulers.io()使用。
    如何配置RxJava的池设置/大小?



  • 解决方案

    1)在RxJava本身,如果一个方法需要一个调度器,我们创建了两个重载:一个没有一个Scheduler参数和一个与它。前者然后委托给后者用一个合理的默认调度。这样一来,消费者API可以选择接受默认的或者自己去了。

    2),这取决于你的计算。如果计算花费相似的时间,以等待在IO,可以移动计算到计算调度,从而释放内的IO缓存工作线程做更多的阻塞。否则,你可能只是做同样的调度程序的业务逻辑。

    3)目前不能配置RxJava池的大小。计算总是会使用 Runtime.availableProcessors()和IO将始终作为一个无界的线程池缓存。如果你可以用事件线程跳跃生活(意思是:他们都保证是串行的,但一个事件可以在线程1执行,并在随后的线程2),您可以通过 Schedulers.from使用自己的ExecutorServices ()

    We are in the process of prototyping a REST API using reactive programming. As shown in the diagram, we keep 3 layers same as we used in our previouse sync API designs ;

    http://oi59.tinypic.com/339hhki.jpg

    1. API Layer implemented using Jersey2 which will process request/deserialize JSON and handover to Service Layer.
    2. Service Layer which implements the business-logic.Implemented using reactive programming (RxJava)
    3. Dao Layer which is used for persistence operations by Service Layer.Since we use CouchBase , this will use CouchBase RxClient.

    To my understanding the flow is as follows :

    a) HTTP request comes,Jersery will process the request/parse JSON/deserialize request model inside a RequestThread from the "Container Thread pool".

    b) With Jersey2 Async support , RequestThread will be return back to Container Thread Pool, and the Service Layer will be executed in Schedulers.computation() scheduler.

    @Path("/resource")
    public class AsyncUserResource {
        @GET
        public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
     
           Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
    
    
           user.subscribe(new Observer<User>() {
    
                @Override
                public void onCompleted() { 
    
                }
    
                @Override
                public void onError(Throwable e) {
                    //handle error using ExceptionMappers
    
                }
    
                @Override
                public void onNext(User user) {
                   asyncResponse.resume(user); 
    
                }});
        }        
    
    
    }
    

    c) Any IO operations inside DAOs will use Schedulers.io() to run these long-processing operations in a separate thread.

    My questions are :

    1. When implementing DAOs/Services , should I hide the schedulars in use (Threading) inside the implementation.

    eg Dao :

    public interface UserDao {
      public Observable<User> getUser();
    }
    

    In the implementation, is it the good practise to specify the Schedular as below;

    public Observable<User> getUser() {
    
            Observable<User> ret = Observable.create((subscriber)->{
                try {
    
                     //Do DB call
                     User u = null;
                     subscriber.onNext(u);
                     subscriber.onCompleted();
    
                }catch (Exception e) {
                    subscriber.onError(e);  
                }
    
            });
            return ret.subscribeOn(Schedulers.io());
    }
    

    Or is it better to simply return the Observable ,and the upper-layer will use a particualr Schedular accordingly ?

    1. Since DAOs mostly involes io/network calls I assume Schedulars.io() should be used. How about for the business-logic in side the Service Layer ? Should them be executed inside Schedulers.computation() (Event Loop) ?

    2. There are two thread pools inside the JVM.One is "Container Thread Pool" and the other is "RxThread Pool" used by Schedulers.io() . How to configure pool settings/size of RxJava ?

    解决方案

    1) In RxJava itself, if an method requires a scheduler, we create two overloads: one without a Scheduler parameter and one with it. The former then delegates to the latter with a reasonable default scheduler. This way, API consumers may chose to accept the default or go with their own.

    2) It depends on your computation. If the computation takes similar time as to wait for the IO, you could move the computation into the computation scheduler, thus freeing up the cached worker threads inside IO to do more blocking. Otherwise, you could just do the business logic on the same scheduler.

    3) You can't configure the pool sizes in RxJava at the moment. Computation will always use Runtime.availableProcessors() and IO will be always act as an unbounded cached threadpool. If you can live with events thread-hopping (meaning: they are guaranteed to be serial but one event may execute on thread 1 and the subsequent on thread 2), you can use your own ExecutorServices via Schedulers.from().

    这篇关于异步RestAPIs与RxJava / Jersey2。线程的问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆