根据观察到的API和退订问题 [英] Observable-based API and unsubscribe issue

查看:170
本文介绍了根据观察到的API和退订问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用的Rx-Java创建适用于Android位置跟踪的类。我仍然不能弄清楚是如何妥善处理我的观测的生命周期。我想有是一个可观察的开始跟踪位置时,第一个收费情况,并在最后的申购将被丢弃停止位置跟踪。我达到到目前为止是这样的:

 公共类LocationObservable
    实现GooglePlayServicesClient.ConnectionCallbacks,
               GooglePlayServicesClient.OnConnectionFailedListener,
               LocationListener的{    私人LocationClient locationClient;
    私人最终PublishSubject<地点> latestLocation =
        PublishSubject.create();
    公众最终观测和LT;地点> locationObservable =
        Observable.defer(() - > {
        如果(locationClient.isConnected()及!&放大器;!locationClient.isConnecting()){
            locationClient.connect();
        }
        返回latestLocation.asObservable()扫描((preV,CURR) - 方式> {
            如果(Math.abs(prev.getLatitude() - curr.getLatitude())> 0.000001 ||
                Math.abs(prev.getLongitude() - curr.getLongitude())> 0.000001)
                返回CURR;
            其他
                返回preV;
        })distinctUntilChanged();});    公共LocationObservable(上下文的背景下){        locationClient =新LocationClient(背景下,这一点,这一点);
    }    @覆盖
    公共无效onConnected(束束){
        latestLocation.onNext(locationClient.getLastLocation());
    }    @覆盖
    公共无效onDisconnected(){
        latestLocation.onCompleted();
    }    @覆盖
    公共无效onConnectionFailed(ConnectionResult connectionResult){
        latestLocation.onError(新的异常(connectionResult.toString()));
    }    @覆盖
    公共无效onLocationChanged(地点){
        latestLocation.onNext(位置);
    }
}

正如你所看到的,我用观测#延迟来初始化位置回调时,首先客户端订阅。我不知道这是否是一个好方法,但它是我想出了目前最好的。有什么我仍然缺少的是如何从我观察到的停止位置更新,当最后一个客户端我的课取消订阅的。或者,也许这件事情不地道的接收,因为它不是明摆着的吗?

我相信,这种使用情况应该是相当标准的,因此应该有它一个标准的/惯用的解决方案。会很乐意知道。


解决方案

 私人LocationClient locationClient;
私人最终观测和LT;整数GT; locationObservable =观测
        .create(新OnSubscribe<整数GT;(){            @覆盖
            公共无效电话(订户LT ;?超级整数GT;用户){
                locationClient.connect();
                subscriber.add(Subscriptions.create(新Action0(){                    @覆盖
                    公共无效调用(){
                        如果(locationClient.isConnected()
                                || locationClient.isConnecting()){
                            locationClient.disconnect();
                        }
                    }                }));
            }        })多播。(PublishSubject<整数GT;创建())引用计数()。

I'm trying to use Rx-Java to create a class for location tracking on Android. What I can't still figure out is how to handle lifecycle of my Observable properly. What I want to have is an Observable that starts tracking location when first subscription happens, and stops location tracking when last subscription is discarded. What I achieved so far is this:

public class LocationObservable 
    implements GooglePlayServicesClient.ConnectionCallbacks, 
               GooglePlayServicesClient.OnConnectionFailedListener, 
               LocationListener {

    private LocationClient locationClient;
    private final PublishSubject<Location> latestLocation = 
        PublishSubject.create();
    public final Observable<Location> locationObservable = 
        Observable.defer(() -> {
        if(!locationClient.isConnected() && !locationClient.isConnecting()) {
            locationClient.connect();
        }
        return latestLocation.asObservable().scan((prev, curr) -> {
            if (Math.abs(prev.getLatitude() - curr.getLatitude()) > 0.000001 ||
                Math.abs(prev.getLongitude() - curr.getLongitude()) > 0.000001)
                return curr;
            else
                return prev;
        }).distinctUntilChanged();});

    public LocationObservable(Context context) {

        locationClient = new LocationClient(context, this, this);
    }

    @Override
    public void onConnected(Bundle bundle) {
        latestLocation.onNext(locationClient.getLastLocation());
    }

    @Override
    public void onDisconnected() {
        latestLocation.onCompleted();
    }

    @Override
    public void onConnectionFailed(ConnectionResult connectionResult) {
        latestLocation.onError(new Exception(connectionResult.toString()));
    }

    @Override
    public void onLocationChanged(Location location) {
        latestLocation.onNext(location);
    }
}

As you can see, I use Observable#defer to init location callbacks when first client subscribes. I don't know if it's a good approach, but it's the best I came up with at the moment. What I'm still missing is how to stop the location updates when last client of my class unsubscribes from my observable. Or maybe it's something non-idiomatic in Rx, since it's not obvious?

I believe, this use case should be rather standard, and therefore there should be a standard/idiomatic solution for it. Would be happy to know it.

解决方案

private LocationClient locationClient;
private final Observable<Integer> locationObservable = Observable
        .create(new OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                locationClient.connect();
                subscriber.add(Subscriptions.create(new Action0() {

                    @Override
                    public void call() {
                        if (locationClient.isConnected()
                                || locationClient.isConnecting()) {
                            locationClient.disconnect();
                        }
                    }

                }));
            }

        }).multicast(PublishSubject.<Integer> create()).refCount();

这篇关于根据观察到的API和退订问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆