如何在RxJava中明确表示Flowable的完成? [英] How can I explicitly signal completion of a Flowable in RxJava?

查看:1174
本文介绍了如何在RxJava中明确表示Flowable的完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个 Flowable ,它包含一个 Iterable 。我会定期将元素推送到 Iterable ,但似乎完成事件是隐式的。我不知道如何表示处理完成。例如在我的代码中:

I'm trying to create a Flowable which is wrapping an Iterable. I push elements to my Iterable periodically but it seems that the completion event is implicit. I don't know how to signal that processing is complete. For example in my code:

    // note that this code is written in Kotlin
    val iterable = LinkedBlockingQueue<Int>()
    iterable.addAll(listOf(1, 2, 3))

    val flowable = Flowable.fromIterable(iterable)
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.computation())

    flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})

    iterable.add(4)

    Thread.sleep(1000)

    iterable.add(5)

    Thread.sleep(1000)

这打印:


1
2
3
4
已完成

1 2 3 4 completed

我检查了 Flowable 界面的来源,但似乎我无法发出明确完成 Flowable 的信号。我怎么能这样做?在我的程序中,我发布了它们之间有一些延迟的事件,我想明确何时完成事件流。

I checked the source of the Flowable interface but it seems that I can't signal that a Flowable is complete explicitly. How can I do so? In my program I publish events which have some delay between them and I would like to be explicit when to complete the event flow.

澄清
我有一个长时间运行的进程,它会发出事件。我将它们收集在一个队列中,并且我公开了一个返回Flowable的方法,该Flowable包裹着我的队列。问题是当我创建Flowable时队列中可能已经存在元素。我只处理事件一次,我知道事件流何时停止,所以我知道何时需要完成Flowable。

Clarification: I have a long running process which emits events. I gather them in a queue and I expose a method which returns a Flowable which wraps around my queue. The problem is that there might be already elements in the queue when I create the Flowable. I will process the events only once and I know when the flow of events stops so I know when I need to complete the Flowable.

推荐答案

使用 .fromIterable 是为您的用例创建 Flowable 的错误方法。

我实际上并不清楚该用例是什么,但您可能想要使用 Flowable.create() PublishSubject

Using .fromIterable is the wrong way to create a Flowable for your use case.
Im not actually clear on what that use case is, but you probably want to use Flowable.create() or a PublishSubject

val flowable = Flowable.create<Int>( {
    it.onNext(1)
    it.onNext(2)
    it.onComplete()
}, BackpressureStrategy.MISSING)

val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()

当然,你如何处理背压将取决于数据来源的性质。

Of course how you deal with back-pressure will depend on the nature of the source of data.

这篇关于如何在RxJava中明确表示Flowable的完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆