RxJava合并请求序列 [英] RxJava Combine Sequence Of Requests

查看:577
本文介绍了RxJava合并请求序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我有两个Apis. Api 1为我提供了一个项目列表,Api 2为我提供了从Api 1获得的每个项目的更详细信息.到目前为止,我解决该问题的方式都会导致性能下降.

问题

借助Retrofit和RxJava,可以有效,快速地解决此问题.

我的方法

此刻,我的解决方案如下:

第1步:改造从Api 1执行Single<ArrayList<Information>>.

第2步:我遍历此项目,并向Api 2提出要求.

第3步:翻新回报按顺序执行Single<ExtendedInformation> 每个项目

第4步:在完全执行了Api 2的所有调用之后,我为所有包含信息和扩展信息的项创建了一个新对象.

我的代码

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

解决方案

tl; dr 使用concatMapEagerflatMap并异步或在调度程序上执行子调用.


长话

我不是android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2).

如果我看对图片,所需的流程是:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

假设Retrofit为

生成了客户端

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果项目的顺序不重要,则可以仅使用flatMap:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

仅在改造构建器配置为

  • 使用异步适配器(调用将在okhttp内部执行程序中排队).我个人认为这不是一个好主意,因为您无法控制该执行程序.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    

  • 或使用基于调度程序的适配器(调用将在RxJava调度程序上进行调度).这是我的首选,因为您明确选择了使用哪个调度程序,它很可能是IO调度程序,但是您可以自由尝试其他调度程序.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

原因是flatMap将订阅api2.extendedInfo(...)创建的每个可观察对象,并将它们合并到结果可观察对象中.因此结果将按接收顺序显示.

如果未将改装客户端设置为异步或设置为在调度程序上运行,则可以设置以下一项:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

此结构几乎与前一个执行程序相同,它表示 local 每个api2.extendedInfo应该在哪个调度程序上运行.

可以调整flatMapmaxConcurrency参数,以控制要同时执行多少个请求.尽管我对此会保持谨慎,但您不希望同时运行所有查询.通常,默认的maxConcurrency就足够了(128).

现在,如果原始查询的顺序很重要. concatMap通常是按顺序执行与flatMap相同的操作符,但是顺序执行,如果代码需要等待所有子查询执行,则结果会很慢.解决方案虽然是concatMapEager的又一步骤,但该解决方案将按顺序订阅observable,并根据需要缓冲结果.

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

或者如果必须在本地设置调度程序:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

还可以在此运算符中调整并发性.


此外,如果Api返回Flowable,则可以在RxJava 2.1.7中使用仍处于beta版的.parallel.但是结果却不整齐,我还不知道有没有排序的方法(还可以吗?).

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>

The Problem

I have two Apis. Api 1 gives me a List of Items and Api 2 gives me more detailed Information for each of the items I got from Api 1. The way I solved it so far results in bad Performance.

The Question

Efficent and fast solution to this Problem with the help of Retrofit and RxJava.

My Approach

At the Moment my Solution Looks like this:

Step 1: Retrofit executes Single<ArrayList<Information>> from Api 1.

Step 2: I iterate through this Items and make a request for each to Api 2.

Step 3: Retrofit Returns Sequentially executes Single<ExtendedInformation> for each item

Step 4: After all calls form Api 2 completely executed I create a new Object for all Items combining the Information and Extended Information.

My Code

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

解决方案

tl;dr use concatMapEager or flatMap and execute sub-calls asynchronously or on a schedulers.


long story

I'm not an android developer, so my question will be limited to pure RxJava (version 1 and version 2).

If I get the picture right the needed flow is :

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

Assuming Retrofit generated the clients for

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

If the order of the item is not important, then it is possible to use flatMap only:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

But only if the retrofit builder is configured with

  • Either with the async adapter (calls will be queued in the okhttp internal executor). I personally think this is not a good idea, because you don't have control over this executor.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    

  • Or with the scheduler based adapter (calls will be scheduled on the RxJava scheduler). It would my preferred option, because you explicitly choose which scheduler is used, it will be most likely the IO scheduler, but you are free to try a different one.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

The reason is that flatMap will subscribe to each observable created by api2.extendedInfo(...) and merge them in the resulting observable. So results will appear in the order they are received.

If the retrofit client is not set to be async or set to run on a scheduler, it is possible to set one :

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

This structure is almost identical to the previous one execpts it indicates locally on which scheduler each api2.extendedInfo is supposed to run.

It is possible to tune the maxConcurrency parameter of flatMap to control how many request you want to perform at the same time. Although I'd be cautious on this one, you don't want run all queries at the same time. Usually the default maxConcurrency is good enough (128).

Now if order of the original query matter. concatMap is usually the operator that does the same thing as flatMap in order but sequentially, which turns out to be slow if the code need to wait for all sub-queries to be performed. The solution though is one step further with concatMapEager, this one will subscribe to observable in order, and buffer the results as needed.

Assuming retrofit clients are async or ran on a specific scheduler :

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

Or if the scheduler has to be set locally :

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

It is also possible to tune the concurrency in this operator.


Additionally if the Api is returning Flowable, it is possible to use .parallel that is still in beta at this time in RxJava 2.1.7. But then results are not in order and I don't know a way (yet?) to order them without sorting after.

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>

这篇关于RxJava合并请求序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆