像 RxJava 中的 Subject 一样排队 [英] Queue like Subject in RxJava

查看:41
本文介绍了像 RxJava 中的 Subject 一样排队的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找可以:

  1. 如果没有订阅者,可以接收项目并将它们保存在队列或缓冲区中
  2. 一旦我们有一个订阅者,所有的项目都被消耗掉了,再也不会发出
  3. 我可以订阅/取消订阅主题

BehaviorSubject 几乎可以完成这项工作,但它保留了最后观察到的项目.

BehaviorSubject almost would do the job, but it retains last observed item.

更新

根据接受的答案,我为单个观察到的项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.

Based on accepted answer I worked out similar solution for single observed item. Also added unsubscription part to avoid memory leaks.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber)

                subscriber.add(Subscriptions.create { subscription.unsubscribe() })
            }

            return LastEventObservable(onSubscribe, state)
        }
    }

    private class State {
        var lastItem: Any? = null
        val subscriber = PublishSubject.create<Any>()
    }
}

推荐答案

我实现了预期的结果,创建了一个定制的 Observable,它包装了一个发布主题并在没有订阅者的情况下处理发射缓存.看看吧.

I achieve the expected result creating a customized Observable that wraps a publish subject and handles emission cache if there's no subscribers attached. Check it out.

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        MyCustomObservable myCustomObservable = new MyCustomObservable();

        myCustomObservable.emit("1");
        myCustomObservable.emit("2");
        myCustomObservable.emit("3");

        Subscription subscription = myCustomObservable.subscribe(System.out::println);

        myCustomObservable.emit("4");
        myCustomObservable.emit("5");

        subscription.unsubscribe();

        myCustomObservable.emit("6");
        myCustomObservable.emit("7");
        myCustomObservable.emit("8");

        myCustomObservable.subscribe(System.out::println);
    }
}

class MyCustomObservable extends Observable<String> {
    private static PublishSubject<String> publishSubject = PublishSubject.create();
    private static List<String> valuesCache = new ArrayList<>();

    protected MyCustomObservable() {
        super(subscriber -> {
            Observable.from(valuesCache)
                    .doOnNext(subscriber::onNext)
                    .doOnCompleted(valuesCache::clear)
                    .subscribe();

            publishSubject.subscribe(subscriber);
        });
    }

    public void emit(String value) {
        if (publishSubject.hasObservers()) {
            publishSubject.onNext(value);
        } else {
            valuesCache.add(value);
        }
    }
}

希望有帮助!

此致.

这篇关于像 RxJava 中的 Subject 一样排队的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆