重复调用API,直到整个数据被下载数据 [英] Call API repeatedly until entire data has been is downloaded data

查看:209
本文介绍了重复调用API,直到整个数据被下载数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,我想反复为每个用户调用一个Web API,直到下载整个数据。

I have a use case where I want to call a web API for each user repeatedly until entire data is downloaded.

我拥有的Web API,允许最大化每个API请求为用户提取100条记录。我可以为该用户指定 startTime 和要下载的记录的结束时间戳:

The web API that I have, allows maximum 100 records to be fetched for a user per API request. I can specify startTime and end timestamp of the records to be download for that user:

void downloadRecord(String userId, recordStartTime, recordEndTime, int countOfRecord, ResultCallBack)

如果 recordStartTime recordEndTime 之间的记录数超过100,然后API响应只返回100条记录。然后我必须通过新的开始时间(刚下载的第100条记录的时间)来调用此apis,直到下载所有记录。

If the number of records between recordStartTime and recordEndTime is more than 100, then API response only returns 100 records. And then I have to loop through to call this apis with new start time (time of the 100th record just downloaded), until all the records are downloaded.

final long recordStartTime = timeStampFrom2DaysAgo//;
Observable.from(arrayListOfUserIds).flatMap(new Func1<String, Observable<?>>() {
    @Override
    public Observable<?> call(String userId) {
        long recordEndTime = getCurrentTimeMS();
        //keep downloading records 
        downloadRecord(userId, recordStartTime, recordEndTime, 100, new ResultCallBack() {
            //if records are < 100 then stop 
            //else keep downloading 
        });
    }
}).subscribe();

请提示是否有可用于解决问题的RxJava示例代码。

Please suggest if there is a RxJava sample code that I can use to solve my problem.

谢谢

推荐答案

这是一个循环相关的Observables。 想象一下,您有一个Observable用于请求,一个Observable用于响应。 requests Observable发出一些 DownloadParams 对象,包含userId,recordStartTime,recordEndTime和countOfRecord,因此 Observable< ; DownloadParams>请求回复 Observable发出记录列表,即 Observable< List< Record>>回复

This is a case of circularly dependent Observables. Imagine you have one Observable for the requests and one Observable for the responses. requests Observable emits some DownloadParams objects, containing userId, recordStartTime, recordEndTime and countOfRecord, hence Observable<DownloadParams> requests. responses Observable emits the list of records, i.e., Observable<List<Record>> responses.

回复显然取决于请求,但不那么明显的部分是我们还需要请求依赖回复,因为从服务器下次下载某些 DownloadParams ,取决于我们从之前的下载中获得的响应。为了完整性,请求实际上还取决于一些初始化Observable,它发出userId来执行第一次下载。你可以用 .startWith(firstDownloadParams)替换这个初始化Observable。

responses obviously depends on requests, but the not so obvious part is that we need requests also to depend on responses, because the next download from the server, with certain DownloadParams, depends on what response we got from the previous download. For completeness, requests actually also depends on some initialization Observable which emits the userId to perform the first download. You can replace this initialization Observable with just a .startWith(firstDownloadParams).

无论如何,困难的部分是表达循环依赖。好消息是这在Rx中是可能的,并且已经基于RxJS的Cycle.js框架的重点。坏消息是我们无法在没有主题的情况下解决这个问题,这可能是相当不受欢迎的。

Anyway, the hard part is expressing the cyclic dependency. The good news is this is possible in Rx, and has been the focus of Cycle.js framework based on RxJS. The bad news is that we cannot solve this without Subjects, which might be rather undesirable.

最好保持RxJava代码尽可能的功能,但试图做我们遇到了问题。如果我们尝试将Observable声明为其他人的函数,我们得到:

It is better to keep RxJava code as functional as possible, but by attempting to do that we reach a problem. If we try to declare the Observables as a function of others, we get:

Observable<DownloadParams> requests = responses.flatMap( /* ... */ )
    .startWith(firstDownloadParams);
Observable<List<Record>> responses = requests.flatMap( /* ... */ );

这不会编译,因为第一个声明需要第二个声明,反之亦然。这是受试者可以提供的帮助。我们将其中一个Observable声明为主题:

This doesn't compile because the first declaration needs the second declaration, and vice versa. This is how subjects can help. We declare either one of those Observables as a Subject:

PublishSubject<List<Record>> responsesProxy = PublishSubject.create();
Observable<DownloadParams> requests = responsesProxy.flatMap( /* ... */ )
    .startWith(firstDownloadParams);
Observable<List<Record>> responses = requests.flatMap( /* ... */ );

主题 responsesProxy 将充当代理对于回复,以便我们能够声明请求,具体取决于 responsesProxy 。但现在我们需要 responsesProxy 来模仿回复。我们通过添加以下内容来实现:

The Subject responsesProxy will act as a proxy for responses, so that we are able to declare requests depending on responsesProxy. But now we need responsesProxy to imitate responses. We do that by adding this:

responses.subscribe(responsesProxy);

这会关闭循环依赖中的循环,但只记得在我们制作之后正确处理主题上面的订阅。

This closes the loop in the circular dependency, but just remember to properly dispose also the Subject since we made the subscription above.

现在我们只需要填写那些 flatMap 中的转换。在 requests.flatMap()中,您应该执行下载记录列表的网络调用。您可能希望将其作为Observable处理,而不是作为回调处理,以减轻与其他RxJava代码的相互作用。

Now we just need to fill in the transformations in those flatMap. In requests.flatMap( ) you should perform the network call that downloads the list of records. You probably want to handle this as an Observable, not as a callback, to ease the interplay with the rest of the RxJava code.

responsesProxy中.flatMap()你应检查记录列表是否为100或更多并使用新的recordStartTime创建下一个 DownloadParams ,包装为 Observable.just(newDownloadParams)。如果小于100,则返回 Observable.empty()

In responsesProxy.flatMap( ) you should check if the list of records are 100 or more and create the next DownloadParams with a new recordStartTime, wrapped as Observable.just(newDownloadParams). If less than 100, then return Observable.empty().

这篇关于重复调用API,直到整个数据被下载数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆