Rx周期性观察到的发射值 [英] Rx Observable emitting values periodically

查看:108
本文介绍了Rx周期性观察到的发射值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须定期轮询一些RESTful端点以刷新我的android应用程序的数据.我还必须基于连接暂停和恢复它(如果电话处于脱机状态,则无需尝试).我当前的解决方案正在运行,但是它使用标准Java的 ScheduledExecutorService 来执行定期任务,但是我想继续使用Rx范例.

I have to poll some RESTful endpoint periodically to refresh my android app's data. I also have to pause and resume it based on connectivity (if the phone is offline, there's no need to even try). My current solution is working, but it uses standard Java's ScheduledExecutorService to perform periodic tasks, but I'd like to stay in Rx paradigm.

这是我当前的代码,为简洁起见,部分代码被跳过了.

Here's my current code, parts of which are skipped for brevity.

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservable 基本上是包裹在 Observable< Boolean> 中的广播接收器,表明电话已连接到网络.

networkStatusObservable is basically a broadcast receiver wrapped into Observable<Boolean>, indicating that the phone is connected to the network.

正如我所说,此解决方案有效,但是我想使用Rx方法进行定期轮询并发出新的 UserProfile ,因为手动安排事情有很多问题,我想避免.我了解 Observable.timer Observable.interval ,但无法弄清楚如何将它们应用于此任务(而且我不确定是否需要使用那些).

As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfiles, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timer and Observable.interval, but can't figure out how to apply them to this task (and I'm not sure if I need to use those at all).

推荐答案

在此GitHub问题上,有一些方法可能会有所帮助.

There are a few approaches on this GitHub issue that you might find helpful.

https://github.com/ReactiveX/RxJava/issues/448

三个实现是:

Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
        .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) {
                return lyftApi.updateAppState(params).materialize(); } });


Scheduler.schedule定期

Observable.create({ observer ->
        Schedulers.newThread().schedulePeriodically({
            observer.onNext("application-state-from-network");
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }).take(10).subscribe({ v -> println(v) });


手动递归

Observable.create(new OnSubscribeFunc<String>() {
        @Override
        public Subscription onSubscribe(final Observer<? super String> o) {
            return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                @Override
                public Subscription call(Scheduler inner, Long t2) {
                    o.onNext("data-from-polling");
                    return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<String>() {
        @Override
        public void call(String v) {
            System.out.println("output: " + v);
        }
    });

结论是手动递归是一种方法,因为它会等到操作完成后再安排下一次执行.

And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.

这篇关于Rx周期性观察到的发射值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆