为什么不使用 GlobalScope.launch? [英] Why not use GlobalScope.launch?

查看:37
本文介绍了为什么不使用 GlobalScope.launch?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读到非常不鼓励使用 Globalscope此处.

I read that usage of Globalscope is highly discouraged, here.

我有一个简单的用例.对于我收到的每条 kafka 消息(假设是一个 Id 列表),我必须拆分它并同时为每个 Id 调用一个休息服务,然后等待它完成并继续执行其他同步任务.该应用程序中没有其他东西需要协程.在这种情况下,我可以使用 Globalscope 吗?

I have a simple use-case. For every kafka message (let's say a list of Ids) that I receive I have to split it and invoke a rest service simultaneously for each of those Ids and wait for it to be done and proceed with other synchronous tasks. There is nothing else in that application that requires coroutine. In this case, Can I just get away with using Globalscope ?

注意:这不是安卓应用程序.它是一个运行在服务器端的 kafka 流处理器.这是一个在 Kubernetes 中运行的短暂、无状态、容器化 (Docker) 应用程序(如果您愿意,则符合流行语)

Note: This is not an android application. It's a kafka stream processor running on server side. It's an ephemeral, stateless, containerized (Docker) application running in Kubernetes (Buzzword-compliant if you will)

推荐答案

您应该使用结构化并发适当地确定并发范围.如果您不这样做,您的协程可能会泄漏.在您的情况下,将它们的范围限定为处理单个消息似乎是合适的.

You should scope your concurrency appropriately using structured concurrency. Your coroutines can leak if you don't do this. In your case, scoping them to the processing of a single message seems appropriate.

这是一个例子:

/* I don't know Kafka, but let's pretend this function gets 
 * called when you receive a new message
 */
suspend fun onMessage(msg: Message) {
    val ids: List<Int> = msg.getIds()    

    val jobs = ids.map { id ->
        GlobalScope.launch { restService.post(id) }
    }

    jobs.joinAll()
}

如果对 restService.post(id) 的调用之一失败并出现异常,该示例将立即重新抛出异常,所有尚未完成的作业将泄漏.它们将继续执行(可能无限期地执行),如果它们失败了,您将一无所知.

If one of the calls to restService.post(id) fails with an exception, the example will immediately rethrow the exception, and all the jobs that hasn't completed yet will leak. They will continue to execute (potentially indefinitely), and if they fail, you won't know about it.

要解决这个问题,您需要确定协程的范围.这是没有泄漏的相同示例:

To solve this, you need to scope your coroutines. Here's the same example without the leak:

suspend fun onMessage(msg: Message) = coroutineScope {
    val ids: List<Int> = msg.getIds()    

    ids.forEach { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

在这种情况下,如果对 restService.post(id) 的调用之一失败,则协程范围内的所有其他未完成的协程将被取消.当你离开作用域时,你可以确定你没有泄露任何协程.

In this case, if one of the calls to restService.post(id) fails, then all other non-completed coroutines inside the coroutine scope will get cancelled. When you leave the scope, you can be sure that you haven't leaked any coroutines.

另外,因为 coroutineScope 会等到所有子协程完成,你可以放弃 jobs.joinAll() 调用.

Also, because coroutineScope will wait until all child-coroutines are done, you can drop the jobs.joinAll() call.

旁注:在编写启动一些协程的函数时,一个约定是让调用者使用接收器参数来决定协程范围.使用 onMessage 函数执行此操作可能如下所示:

Side note: A convention when writing a function that start some coroutines, is to let the caller decide the coroutine scope using the receiver parameter. Doing this with the onMessage function could look like this:

fun CoroutineScope.onMessage(msg: Message): List<Job> {
    val ids: List<Int> = msg.getIds()    

    return ids.map { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

这篇关于为什么不使用 GlobalScope.launch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆