如何在RxJava中明确表示Flowable的完成? [英] How can I explicitly signal completion of a Flowable in RxJava?
问题描述
我正在尝试创建一个 Flowable
,它包含一个 Iterable
。我会定期将元素推送到 Iterable
,但似乎完成事件是隐式的。我不知道如何表示处理完成。例如在我的代码中:
I'm trying to create a Flowable
which is wrapping an Iterable
. I push elements to my Iterable
periodically but it seems that the completion event is implicit. I don't know how to signal that processing is complete. For example in my code:
// note that this code is written in Kotlin
val iterable = LinkedBlockingQueue<Int>()
iterable.addAll(listOf(1, 2, 3))
val flowable = Flowable.fromIterable(iterable)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})
iterable.add(4)
Thread.sleep(1000)
iterable.add(5)
Thread.sleep(1000)
这打印:
1
2
3
4
已完成
1 2 3 4 completed
我检查了 Flowable
界面的来源,但似乎我无法发出明确完成 Flowable
的信号。我怎么能这样做?在我的程序中,我发布了它们之间有一些延迟的事件,我想明确何时完成
事件流。
I checked the source of the Flowable
interface but it seems that I can't signal that a Flowable
is complete explicitly. How can I do so? In my program I publish events which have some delay between them and I would like to be explicit when to complete
the event flow.
澄清:
我有一个长时间运行的进程,它会发出事件。我将它们收集在一个队列中,并且我公开了一个返回Flowable的方法,该Flowable包裹着我的队列。问题是当我创建Flowable时队列中可能已经存在元素。我只处理事件一次,我知道事件流何时停止,所以我知道何时需要完成Flowable。
Clarification: I have a long running process which emits events. I gather them in a queue and I expose a method which returns a Flowable which wraps around my queue. The problem is that there might be already elements in the queue when I create the Flowable. I will process the events only once and I know when the flow of events stops so I know when I need to complete the Flowable.
推荐答案
使用 .fromIterable
是为您的用例创建 Flowable
的错误方法。
我实际上并不清楚该用例是什么,但您可能想要使用 Flowable.create()
或 PublishSubject
Using .fromIterable
is the wrong way to create a Flowable
for your use case.
Im not actually clear on what that use case is, but you probably want to use Flowable.create()
or a PublishSubject
val flowable = Flowable.create<Int>( {
it.onNext(1)
it.onNext(2)
it.onComplete()
}, BackpressureStrategy.MISSING)
val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()
当然,你如何处理背压将取决于数据来源的性质。
Of course how you deal with back-pressure will depend on the nature of the source of data.
这篇关于如何在RxJava中明确表示Flowable的完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!