RXJava-使可观察的观察到(例如,带有缓冲区和窗口) [英] RXJava - make a pausable observable (with buffer and window for example)

查看:79
本文介绍了RXJava-使可观察的观察到(例如,带有缓冲区和窗口)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要创建可观察的对象,以进行以下操作:

I want to create observables that do following:

  • 在所有项目暂停时对其进行缓冲
  • 在不暂停的情况下立即发出物品
  • 暂停/恢复触发器必须来自另一个可观察的
  • 必须将其保存以供不在主线程上运行的可观察对象使用,并且必须将其保存以更改主线程的暂停/恢复状态

我想使用BehaviorSubject<Boolean>作为触发器并将此触发器绑定到活动的onResumeonPause事件. (附加代码示例)

I want to use a BehaviorSubject<Boolean> as trigger and bind this trigger to an activity's onResume and onPause event. (Code example appended)

问题

我已经设置了一些东西,但是它没有按预期工作.我使用它的方式如下:

I've setup something, but it is not working as intended. I use it like following:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

当前的问题是,变量1应该可以正常工作,但有时事件只是不发出-阀门不发出,直到阀门一切正常为止(可能是穿线问题...)!解决方案2简单得多,似乎可以使用,但是我不确定它是否真的更好,我认为不是.我实际上不确定,为什么解决方案一有时会失败,所以我不确定解决方案2是否解决了(目前对我来说是未知的)问题...

Currently the problem is, that Variant 1 should work fine, but sometimes, the events are just not emitted - the valve is not emitting, until the valve everything is working (may be a threading problem...)! Solution 2 is much simplier and seems to work, but I'm not sure if it is really better, I don't think so. I'm actually not sure, why solution one is failing sometimes so I'm not sure if solution 2 solves the (currently for me unknown) problem...

有人可以告诉我可能是什么问题,或者简单的解决方案是否应该可靠地起作用?还是给我看一个可靠的解决方案?

Can someone tell me what could be the problem or if the simple solution should work reliably? Or show me a reliable solution?

代码

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser函数

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

活动

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}

推荐答案

您实际上可以使用.buffer()运算符将其传递为可观察到的值,定义何时停止缓冲,以及从本书中获取示例:

You can actually use .buffer() operator passing it observable, defining when to stop buffering, sample from book:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);

从第5章确定序列"开始:

from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

您可以将PublishSubject用作Observable,以在自定义运算符中提供元素.每次需要开始缓冲时,请按Observable.defer(() -> createBufferingValve())

You can use PublishSubject as Observable to feed it elements in your custom operator. Every time you need to start buffering, create instance by Observable.defer(() -> createBufferingValve())

这篇关于RXJava-使可观察的观察到(例如,带有缓冲区和窗口)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆