连续执行 rx.Obseravables [英] Executing rx.Obseravables secuentially

查看:42
本文介绍了连续执行 rx.Obseravables的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Fernando Ceja 的简洁架构开发 Android 应用.我的交互者或用例之一负责获取用户的提要数据.为了获取数据,首先我必须从数据库表中检索用户的团队,然后我必须从服务器端获取 Feed 列表.

这是我从数据库层获取团队的方式:

 mTeamCache.getAllTeams().subscribe(new DefaultSubscriber>() {@覆盖public void onNext(List simpleTeams) {super.onNext(simpleTeams);mTeams = simpleTeams;}});

TeamCache 基本上只是另一个交互器,负责获取我在数据库中拥有的所有团队.

以下是我从服务器端获取 Feed 数据的方法:

 mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber>() {@覆盖protected void onServerSideError(Throwable errorResponse) {callback.onFeedFetchFailed(...);}@覆盖protected void onSuccess(List responseBody) {//用mTeams做事callback.onFeedFetched(...);}});

我的 GetFeedInteractor 类有一个名为 execute 的方法,我在此传递回调,稍后我将在 UI 中使用该回调来处理响应.所有这一切的问题在于,目前我正在像这样链接响应:

@Override公共无效执行(最终回调回调,字符串 userSipId){mTeamCache.getAllTeams().subscribe(new DefaultSubscriber>() {@覆盖public void onNext(List simpleTeams) {super.onNext(simpleTeams);mTeams = simpleTeams;getFeedFromRepository(回调);}});}public void getFeedFromRepository(最终回调回调){mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber>() {@覆盖protected void onServerSideError(Throwable errorResponse) {callback.onFeedFetchFailed("failed");}@覆盖protected void onSuccess(List responseBody) {//用mTeams做事列表responseList = new ArrayList();for (ApiFeedResponse apiFeedResponse : responseBody) {responseList.add(FeedDataMapper.transform(apiFeedResponse));}callback.onFeedFetched(responseList);}});}

如您所见,一旦我从缓存交互器获取团队集合,我就会调用从同一个订阅者获取提要的方法.我不喜欢这个.我希望能够做一些更好的事情,比如使用 Observable.concat(getTeamsFromCache(), getFeedFromRepository());将调用链接到 Subscriber 中的另一个 rx.Observable 并不是一件好事.我想我的问题是,如何链接两个使用不同订阅者的 rx.Observable?

更新:

ServerSubscriber 是我实现订阅 Retrofit 服务的订阅者.它只是检查错误代码和一些东西.这是:

https://gist.github.com/4gus71n/65dc94de4ca01fb228>b07b07

默认订阅者是一个空的默认订阅者.这是:

https://gist.github.com/4gus71n/df501928fc5d24c240a3060

TeamCache#getAllTeams() 返回 rx.Observable>FeedRepository#getFeed(int page, int offset) 返回 rx.Observable>

更新 2:

这是现在获取用户提要的交互器的样子:

@Overridepublic void execute(final Callback callback, int offset, int pageSize) {用户用户 = mGetLoggedUser.get();String userSipid = mUserSipid.get();mFeedRepository.getFeed(offset, pageSize)//从服务端获取items.onErrorResumeNext(mFeedCache.getFeed(userSipid))//如果出现问题,从缓存中取出.mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user))//将响应与待处理的帖子合并.subscribe(new DefaultSubscriber>() {@覆盖public void onNext(List baseFeedItems) {callback.onFeedFetched(baseFeedItems);}@覆盖public void onError(Throwable e) {if (e instanceof ServerSideException) {//处理http错误} else if (e instanceof DBException) {//处理数据库缓存错误} 别的 {//处理一般错误}}});}

我认为您忽略了 RxJava 和响应式方法的要点,您不应该有具有 OO 层次结构和回调的不同订阅者.
你应该构造分离的 Observables ,它应该发出它处理的特定数据,没有 Subscriber,然后你可以根据需要链接你的 Observable,最后,您的订阅者会对链接的 Observable 流的预期最终结果做出反应.

像这样(使用 lambdas 来获得更精简的代码):

TeamCache mTeamCache = new TeamCache();FeedRepository mFeedRepository = new FeedRepository();Observable.zip(teamsObservable, feedObservable, Pair::new).subscribe(resultPair -> {//用mTeams做事列表responseList = new ArrayList();for (ApiFeedResponse apiFeedResponse : resultPair.second) {responseList.add(FeedDataMapper.transform(apiFeedResponse));}}, 可抛出 ->{//处理错误});

我使用了 zip 而不是 concat 因为看起来你在这里有 2 个独立的调用,你想等待它们完成(将它们压缩"在一起)然后采取行动,当然,因为你已经分离了 Observables 流,你可以根据需要将它们以不同的方式链接在一起.

对于带有所有响应验证逻辑的 ServerSubscriber,它也应该是 rxify,这样你就可以将它与服务器 Observable 流组合在一起.

这样的事情(为了简化而发出一些逻辑,因为我不熟悉它......)

Observable>teamObservable = mTeamCache.getAllTeams();Observable>feedObservable = mFeedRepository.getFeed(0, 50).flatMap(apiFeedsResponse -> {如果(apiFeedsResponse.code()!= 200){如果(apiFeedsResponse.code()== 304){列表body = apiFeedsResponse.body();返回 Observable.just(body);//onNotModified(o.body());} 别的 {返回 Observable.error(new ServerSideErrorException(apiFeedsResponse));}} 别的 {//onServerSideResponse(o.body());返回 Observable.just(apiFeedsResponse.body());}});

I'm developing an Android App using Fernando Ceja's clean architecture. One of my Interactors or Use Cases is in charge of getting the User's feed data. In order to get the data, first I have to retrieve the User's Teams from a database table and then I have to get the Feed list from the server-side.

This is how I get the Teams from the database layer:

    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
        }
    });

TeamCache is basically just another Interactor that takes care of getting all the teams that I have in the database.

Here's how I get the Feed data from the server-side:

    mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed(...);
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            callback.onFeedFetched(...);
        }
    });

My GetFeedInteractor class has a method called execute, where I pass through the Callback that I'm later using in the UI to handle the response. The issue with all this is that currently I'm chaining the responses like this:

@Override
public void execute(final Callback callback, String userSipId) {
    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
            getFeedFromRepository(callback);
        }
    });
}

public void getFeedFromRepository(final Callback callback) {
    mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed("failed");
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList();

            for (ApiFeedResponse apiFeedResponse : responseBody) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }

            callback.onFeedFetched(responseList);
        }
    });
}

As you can see, once that I get the Team collection from the Cache Interactor I call the method that gets the feed from the very same Subscriber. I don't like this. I want to be able to do something nicer, like using Observable.concat(getTeamsFromCache(), getFeedFromRepository()); chain a call to another rx.Observable inside a Subscriber is not something nice to do. I guess that my question is, how can I chain two rx.Observables that are using different Subscribers?

Update:

ServerSubscriber is a subscriber that I implemted to subscribe to Retrofit services. It simply checks the error codes and some stuff. Here is:

https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5

Default subscriber is an empty default subscriber. Here is:

https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330

TeamCache#getAllTeams() returns rx.Observable> FeedRepository#getFeed(int page, int offset) returns rx.Observable>

Update 2:

This is how the Interactor to get the User's feed looks like now:

@Override
    public void execute(final Callback callback, int offset, int pageSize) {
        User user = mGetLoggedUser.get();
        String userSipid = mUserSipid.get();

        mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
                .onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
                .mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
                .subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
                    @Override
                    public void onNext(List<BaseFeedItem> baseFeedItems) {
                        callback.onFeedFetched(baseFeedItems);
                    }

                    @Override
                    public void onError(Throwable e) {
                        if (e instanceof ServerSideException) {
                            //Handle the http error
                        } else if (e instanceof DBException) {
                            //Handle the database cache error
                        } else {
                            //Handle generic error
                        }
                    }
                });
    }

解决方案

I think you're missing the point of RxJava and reactive approach, you should not have different subscribers with OO hierarchy, and callbacks.
You should construct separated Observables that should emit the specific data it's handle, without the Subscriber, then you can chain you're Observable as needed, and at the end, you have the subscriber that react to the final result expected from the chained Observable stream.

something like this (using lambdas to have more thin code):

TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();

Observable.zip(teamsObservable, feedObservable, Pair::new)
    .subscribe(resultPair -> {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList(); 
            for (ApiFeedResponse apiFeedResponse : resultPair.second) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }
        }, throwable -> {
             //handle errors
           }
     );

I've use zip and not concat as it's seems you have 2 independent calls here that you want to wait for both to finish ('zip' them together) and then act upon, but ofcourse, as you have separated Observables stream, you can chain them together differently according to your needs.

as for your ServerSubscriber with all the response validation logic, it should be rxify too, so you can compose it along your server Observable stream.

something like this (some logic emitted to simplify, and as I'm not familiar with it...)

Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
            .flatMap(apiFeedsResponse -> {
                if (apiFeedsResponse.code() != 200) {
                    if (apiFeedsResponse.code() == 304) {
                        List<ApiFeedResponse> body = apiFeedsResponse.body();
                        return Observable.just(body);
                        //onNotModified(o.body());
                    } else {
                        return Observable.error(new ServerSideErrorException(apiFeedsResponse));
                    }
                } else {
                    //onServerSideResponse(o.body());
                    return Observable.just(apiFeedsResponse.body());
                }
            });

这篇关于连续执行 rx.Obseravables的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆