像 RxJava 中的 Subject 一样排队 [英] Queue like Subject in RxJava
本文介绍了像 RxJava 中的 Subject 一样排队的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在寻找可以:
- 如果没有订阅者,可以接收项目并将它们保存在队列或缓冲区中
- 一旦我们有一个订阅者,所有的项目都被消耗掉了,再也不会发出
- 我可以订阅/取消订阅主题
BehaviorSubject
几乎可以完成这项工作,但它保留了最后观察到的项目.
BehaviorSubject
almost would do the job, but it retains last observed item.
更新
根据接受的答案,我为单个观察到的项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.
Based on accepted answer I worked out similar solution for single observed item. Also added unsubscription part to avoid memory leaks.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber)
subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}
return LastEventObservable(onSubscribe, state)
}
}
private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
推荐答案
我实现了预期的结果,创建了一个定制的 Observable,它包装了一个发布主题并在没有订阅者的情况下处理发射缓存.看看吧.
I achieve the expected result creating a customized Observable that wraps a publish subject and handles emission cache if there's no subscribers attached. Check it out.
public class ExampleUnitTest {
@Test
public void testSample() throws Exception {
MyCustomObservable myCustomObservable = new MyCustomObservable();
myCustomObservable.emit("1");
myCustomObservable.emit("2");
myCustomObservable.emit("3");
Subscription subscription = myCustomObservable.subscribe(System.out::println);
myCustomObservable.emit("4");
myCustomObservable.emit("5");
subscription.unsubscribe();
myCustomObservable.emit("6");
myCustomObservable.emit("7");
myCustomObservable.emit("8");
myCustomObservable.subscribe(System.out::println);
}
}
class MyCustomObservable extends Observable<String> {
private static PublishSubject<String> publishSubject = PublishSubject.create();
private static List<String> valuesCache = new ArrayList<>();
protected MyCustomObservable() {
super(subscriber -> {
Observable.from(valuesCache)
.doOnNext(subscriber::onNext)
.doOnCompleted(valuesCache::clear)
.subscribe();
publishSubject.subscribe(subscriber);
});
}
public void emit(String value) {
if (publishSubject.hasObservers()) {
publishSubject.onNext(value);
} else {
valuesCache.add(value);
}
}
}
希望有帮助!
此致.
这篇关于像 RxJava 中的 Subject 一样排队的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文