RxJava合并请求序列 [英] RxJava Combine Sequence Of Requests
问题描述
问题
我有两个Apis. Api 1为我提供了一个项目列表,Api 2为我提供了从Api 1获得的每个项目的更详细信息.到目前为止,我解决该问题的方式都会导致性能下降.
问题
借助Retrofit和RxJava,可以有效,快速地解决此问题.
我的方法
此刻,我的解决方案如下:
第1步:改造从Api 1执行Single<ArrayList<Information>>
.
第2步:我遍历此项目,并向Api 2提出要求.
第3步:翻新回报按顺序执行Single<ExtendedInformation>
每个项目
第4步:在完全执行了Api 2的所有调用之后,我为所有包含信息和扩展信息的项创建了一个新对象.
我的代码
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
tl; dr 使用concatMapEager
或flatMap
并异步或在调度程序上执行子调用.
长话
我不是android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2).
如果我看对图片,所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
假设Retrofit为
生成了客户端interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
如果项目的顺序不重要,则可以仅使用flatMap
:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
但仅在改造构建器配置为
时-
使用异步适配器(调用将在okhttp内部执行程序中排队).我个人认为这不是一个好主意,因为您无法控制该执行程序.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
-
或使用基于调度程序的适配器(调用将在RxJava调度程序上进行调度).这是我的首选,因为您明确选择了使用哪个调度程序,它很可能是IO调度程序,但是您可以自由尝试其他调度程序.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
原因是flatMap
将订阅api2.extendedInfo(...)
创建的每个可观察对象,并将它们合并到结果可观察对象中.因此结果将按接收顺序显示.
如果未将改装客户端设置为异步或设置为在调度程序上运行,则可以设置以下一项:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
此结构几乎与前一个执行程序相同,它表示 local 每个api2.extendedInfo
应该在哪个调度程序上运行.
可以调整flatMap
的maxConcurrency
参数,以控制要同时执行多少个请求.尽管我对此会保持谨慎,但您不希望同时运行所有查询.通常,默认的maxConcurrency
就足够了(128
).
现在,如果原始查询的顺序很重要. concatMap
通常是按顺序执行与flatMap
相同的操作符,但是顺序执行,如果代码需要等待所有子查询执行,则结果会很慢.解决方案虽然是concatMapEager
的又一步骤,但该解决方案将按顺序订阅observable,并根据需要缓冲结果.
假设改造客户端是异步的或在特定的调度程序上运行:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
或者如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
还可以在此运算符中调整并发性.
此外,如果Api返回Flowable
,则可以在RxJava 2.1.7中使用仍处于beta版的.parallel
.但是结果却不整齐,我还不知道有没有排序的方法(还可以吗?).
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
The Problem
I have two Apis. Api 1 gives me a List of Items and Api 2 gives me more detailed Information for each of the items I got from Api 1. The way I solved it so far results in bad Performance.
The Question
Efficent and fast solution to this Problem with the help of Retrofit and RxJava.
My Approach
At the Moment my Solution Looks like this:
Step 1: Retrofit executes Single<ArrayList<Information>>
from Api 1.
Step 2: I iterate through this Items and make a request for each to Api 2.
Step 3: Retrofit Returns Sequentially executes Single<ExtendedInformation>
for
each item
Step 4: After all calls form Api 2 completely executed I create a new Object for all Items combining the Information and Extended Information.
My Code
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
tl;dr use concatMapEager
or flatMap
and execute sub-calls asynchronously or on a schedulers.
long story
I'm not an android developer, so my question will be limited to pure RxJava (version 1 and version 2).
If I get the picture right the needed flow is :
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
Assuming Retrofit generated the clients for
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
If the order of the item is not important, then it is possible to use flatMap
only:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
But only if the retrofit builder is configured with
Either with the async adapter (calls will be queued in the okhttp internal executor). I personally think this is not a good idea, because you don't have control over this executor.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
Or with the scheduler based adapter (calls will be scheduled on the RxJava scheduler). It would my preferred option, because you explicitly choose which scheduler is used, it will be most likely the IO scheduler, but you are free to try a different one.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
The reason is that flatMap
will subscribe to each observable created by api2.extendedInfo(...)
and merge them in the resulting observable. So results will appear in the order they are received.
If the retrofit client is not set to be async or set to run on a scheduler, it is possible to set one :
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
This structure is almost identical to the previous one execpts it indicates locally on which scheduler each api2.extendedInfo
is supposed to run.
It is possible to tune the maxConcurrency
parameter of flatMap
to control how many request you want to perform at the same time. Although I'd be cautious on this one, you don't want run all queries at the same time. Usually the default maxConcurrency
is good enough (128
).
Now if order of the original query matter. concatMap
is usually the operator that does the same thing as flatMap
in order but sequentially, which turns out to be slow if the code need to wait for all sub-queries to be performed. The solution though is one step further with concatMapEager
, this one will subscribe to observable in order, and buffer the results as needed.
Assuming retrofit clients are async or ran on a specific scheduler :
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
Or if the scheduler has to be set locally :
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
It is also possible to tune the concurrency in this operator.
Additionally if the Api is returning Flowable
, it is possible to use .parallel
that is still in beta at this time in RxJava 2.1.7. But then results are not in order and I don't know a way (yet?) to order them without sorting after.
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
这篇关于RxJava合并请求序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!