取消信号上的Kotlin流收集 [英] Cancel kotlin flow collection on signal
问题描述
我正在努力为Flow创建一个'takeUntilSignal'运算符-一种扩展方法,当另一个流生成输出时,该方法将取消该流.
I'm struggling to create a 'takeUntilSignal' operator for a Flow - an extension method that will cancel a flow when another flow generates an output.
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>
我的最初尝试是尝试在与主要流程集合相同的协程范围内启动信号流的收集,并取消协程范围:
My initial effort was to try to launch collection of the signal flow in the same coroutine scope as the primary flow collection, and cancel the coroutine scope:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
kotlinx.coroutines.withContext(coroutineContext) {
launch {
signal.take(1).collect()
println("signalled")
cancel()
}
collect {
emit(it)
}
}
}
但这是行不通的(并且使用Flow明确禁止使用的"withContext"方法,以防止使用).
But this isn't working (and uses the forbidden "withContext" method that is expressly stubbed out by Flow to prevent usage).
编辑 我将以下可憎性混为一谈,这很不符合定义(结果流只会在从主流中首次排放后才取消),我感觉那里还有更好的方法:
edit I've kludged together the following abomination, which doesn't quite fit the definition (resulting flow will only cancel after first emission from primary flow), and I get the feeling there's a far better way out there:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
combine(
signal.map { it as Any? }.onStart { emit(null) }
) { x, y -> x to y }
.takeWhile { it.second == null }
.map { it.first }
edit2 再试一次,使用channelFlow:
edit2 another try, using channelFlow:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
channelFlow {
launch {
signal.take(1).collect()
println("hello!")
close()
}
collect { send(it) }
close()
}
推荐答案
使用couroutineScope
并在其中启动新的协程:
Use couroutineScope
and start the new coroutine inside:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.take(1).collect()
println("signalled")
this@coroutineScope.cancel()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
//ignore
}
}
这篇关于取消信号上的Kotlin流收集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!