onEach更改StateFlow中的调度程序(kotlin协程) [英] onEach changes the dispatcher in StateFlow (kotlin coroutines)

查看:299
本文介绍了onEach更改StateFlow中的调度程序(kotlin协程)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象以下独立的测试用例

Imagine the following self-contained test case

@Test
fun `stateFlow in GlobalScope`() = runBlockingTest {

    suspend fun makeHeavyRequest(): String {
        return "heavy result"
    }

    val flow1 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("1: before flowOn") }
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    val flow2 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("2: before flowOn") }
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")
        .onEach { logThread("2: after stateIn") }

    val flow3 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("3: before flowOn") }
        .flowOn(testDispatcher)
        .onEach { logThread("3: after flowOn") }
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    flow1.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

    flow2.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

    flow3.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

}

运行它的效果将是:

Thread (1: before flowOn): Thread[main @coroutine#2,5,main]
Thread (2: before flowOn): Thread[main @coroutine#3,5,main]
Thread (2: after stateIn): Thread[main @coroutine#6,5,main]
Thread (3: before flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#8,5,main]
Thread (3: after flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#4,5,main]


org.opentest4j.AssertionFailedError: 
Expected :heavy result
Actual   :init state

flow3中,将onEach放在flowOnstateIn之间完全改变了调度程序并弄乱了结果.为什么会这样?

In flow3 placing the onEach between flowOn and stateIn completely changes the dispatcher and messes up the result. Why is that?

推荐答案

发生这种情况的原因是因为stateIn运算符根据上游流是否为ChannelFlow进行了一些优化.
.flowOn(...)返回ChannelFlow,而.onEach(...)不返回.

The reason this happens is because the stateIn operator has some optimizations depending on whether the upstream flow is a ChannelFlow.
.flowOn(...) returns a ChannelFlow while .onEach(...) does not.

通常这并不重要.在您的情况下为什么如此重要,是因为您希望stateIn返回的流永远不会发出初始值.但是有一个原因是该参数是强制性的,您应该期望收到初始值.您是否真正在很大程度上取决于上游流是否能够发出一个值而不会暂停.

Usually this would not really matter. Why it does matter in your case, is because you expect the flow returned by stateIn to never emit the initial value. But there is a reason this parameter is mandatory and you should expect to receive the initial value. Whether you actually do mostly depends on whether the upstream flow is able to emit a value without suspending.

现在看来,stateIn运算符中的一项优化是,它可能消耗ChannelFlow而不会暂停.这就是为什么您在使用时会得到预期的行为

Now it appears that the one of the optimizations in the stateIn operator is, that it might consume a ChannelFlow without suspending. Thats why you get your expected behavior when using

.flowOn(testDispatcher) /*returns ChannelFlow*/
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")

这篇关于onEach更改StateFlow中的调度程序(kotlin协程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆