onEach更改StateFlow中的调度程序(kotlin协程) [英] onEach changes the dispatcher in StateFlow (kotlin coroutines)
问题描述
想象以下独立的测试用例
Imagine the following self-contained test case
@Test
fun `stateFlow in GlobalScope`() = runBlockingTest {
suspend fun makeHeavyRequest(): String {
return "heavy result"
}
val flow1 = flowOf(Unit)
.map { makeHeavyRequest() }
.onEach { logThread("1: before flowOn") }
.flowOn(testDispatcher)
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
val flow2 = flowOf(Unit)
.map { makeHeavyRequest() }
.onEach { logThread("2: before flowOn") }
.flowOn(testDispatcher)
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
.onEach { logThread("2: after stateIn") }
val flow3 = flowOf(Unit)
.map { makeHeavyRequest() }
.onEach { logThread("3: before flowOn") }
.flowOn(testDispatcher)
.onEach { logThread("3: after flowOn") }
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
flow1.test {
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
}
flow2.test {
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
}
flow3.test {
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
}
}
运行它的效果将是:
Thread (1: before flowOn): Thread[main @coroutine#2,5,main]
Thread (2: before flowOn): Thread[main @coroutine#3,5,main]
Thread (2: after stateIn): Thread[main @coroutine#6,5,main]
Thread (3: before flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#8,5,main]
Thread (3: after flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#4,5,main]
org.opentest4j.AssertionFailedError:
Expected :heavy result
Actual :init state
在flow3
中,将onEach
放在flowOn
和stateIn
之间完全改变了调度程序并弄乱了结果.为什么会这样?
In flow3
placing the onEach
between flowOn
and stateIn
completely changes the dispatcher and messes up the result. Why is that?
推荐答案
发生这种情况的原因是因为stateIn
运算符根据上游流是否为ChannelFlow
进行了一些优化.
.flowOn(...)
返回ChannelFlow
,而.onEach(...)
不返回.
The reason this happens is because the stateIn
operator has some optimizations depending on whether the upstream flow is a ChannelFlow
.
.flowOn(...)
returns a ChannelFlow
while .onEach(...)
does not.
通常这并不重要.在您的情况下为什么如此重要,是因为您希望stateIn
返回的流永远不会发出初始值.但是有一个原因是该参数是强制性的,您应该期望收到初始值.您是否真正在很大程度上取决于上游流是否能够发出一个值而不会暂停.
Usually this would not really matter. Why it does matter in your case, is because you expect the flow returned by stateIn
to never emit the initial value. But there is a reason this parameter is mandatory and you should expect to receive the initial value. Whether you actually do mostly depends on whether the upstream flow is able to emit a value without suspending.
现在看来,stateIn
运算符中的一项优化是,它可能消耗ChannelFlow而不会暂停.这就是为什么您在使用时会得到预期的行为
Now it appears that the one of the optimizations in the stateIn
operator is, that it might consume a ChannelFlow without suspending. Thats why you get your expected behavior when using
.flowOn(testDispatcher) /*returns ChannelFlow*/
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
这篇关于onEach更改StateFlow中的调度程序(kotlin协程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!