如何正确停止 rxjava Flowable? [英] How to properly stop rxjava Flowable?

查看:87
本文介绍了如何正确停止 rxjava Flowable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码结构

服务

public Flowable entryFlow()
{
    return Flowable.fromIterable(this::getEntries)
}

消费者

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

private void onError(Throwable e)
{
    subscriptionFinished();
}

private void subscriptionFinished()
{
    //
}

我需要一种方法来阻止 flowable 在调用 stop 方法时获取和发出数据.

I need a way to stop the flowable from fetching and emitting data when the stop method is called.

通过执行以下操作,我注意到 doOnCancel lambda 并不总是被调用.

By doing the following, I noticed that the doOnCancel lambda is not always called.

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .doOnCancel(this::snapshotFinished)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

替代方案是

volatile stopped;

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .takeUntil(x -> stopped)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    stopped = true;
}

启动和停止的推荐实现是什么,以便流动停止发射并调用 onComplete 或类似方法(doOnCancel 操作?)?

What would be the recommended implementation of start and stop such that the flowable stops emitting and onComplete or a similar method (doOnCancel action?) is called?

稍后编辑:

让我的用例更短

调用disposable.dispose就足以阻止flowable从iterable获取数据并发送到源吗?我只有 1 个订阅者,并且无论原因如何,都需要在 Flowable 结束时调用 onComplete/onError/other-callback.

Is is enough to call disposable.dispose to stop the flowable getting data from iterable and emitting to source? I only have 1 subscriber and need to have either onComplete/onError/other-callback called when the flowable ends regardless of cause.

其他回调是指 doOnCancel/doFinally 等之一.

By other callback I mean one of doOnCancel/doFinally etc.

谢谢

推荐答案

我建议使用 dispose() 方法.然后只需添加 doOnDispose 来触发你的副作用代码.

I would recommend to use the dispose() method. And then just add doOnDispose to trigger your side-effect-code.

这篇关于如何正确停止 rxjava Flowable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆