为什么不使用GlobalScope.launch? [英] Why not use GlobalScope.launch?

查看:4176
本文介绍了为什么不使用GlobalScope.launch?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我听说强烈建议不要使用Globalscope此处.

I read that usage of Globalscope is highly discouraged, here.

我有一个简单的用例.对于收到的每条kafka消息(比如说一个ID列表),我必须将其拆分并同时调用rest服务,等待它完成并继续执行其他同步任务.该应用程序中没有其他需要协程的东西.在这种情况下,我可以摆脱它吗?

I have a simple use-case. For every kafka message (let's say a list of Ids) that I receive I have to split it and invoke a rest service simultaneously and wait for it to be done and proceed with other synchronous tasks. There is nothing else is in that application that requires coroutine. In this case, Can I just get away with it?

注意:这不是android应用程序.它只是在服务器端运行的kafka流处理器.这是一个在Kubernetes中运行的临时,无状态,容器化(Docker)应用程序(如果愿意,则符合Buzzword)

Note: This is not an android application. It's just a kafka stream processor running on server side. It's an ephemeral, stateless, containerized (Docker) application running in Kubernetes (Buzzword-compliant if you will)

推荐答案

您应该使用结构化并发对您的并发进行适当的范围划分.如果不这样做,协程可能会泄漏.在您的情况下,将它们限定为处理单个消息似乎很合适.

You should scope your concurrency appropriately using structured concurrency. Your coroutines can leak if you don't do this. In your case, scoping them to the processing of a single message seems appropriate.

这是一个例子:

/* I don't know Kafka, but let's pretend this function gets 
 * called when you receive a new message
 */
suspend fun onMessage(msg: Message) {
    val ids: List<Int> = msg.getIds()    

    val jobs = ids.map { id ->
        GlobalScope.launch { restService.post(id) }
    }

    jobs.joinAll()
}

如果对restService.post(id)的调用之一失败并出现异常,该示例将立即重新引发该异常,并且所有尚未完成的作业都将泄漏.它们将继续执行(可能无限期地执行),如果失败,您将一无所知.

If one of the calls to restService.post(id) fails with an exception, the example will immediately rethrow the exception, and all the jobs that hasn't completed yet will leak. They will continue to execute (potentially indefinitely), and if they fail, you won't know about it.

要解决此问题,您需要确定协程的范围.这是没有泄漏的相同示例:

To solve this, you need to scope your coroutines. Here's the same example without the leak:

suspend fun onMessage(msg: Message) = coroutineScope {
    val ids: List<Int> = msg.getIds()    

    ids.forEach { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

在这种情况下,如果对restService.post(id)的调用之一失败,则协程范围内的所有其他未完成的协程将被取消.离开示波器时,可以确定没有泄漏任何协程.

In this case, if one of the calls to restService.post(id) fails, then all other non-completed coroutines inside the coroutine scope will get cancelled. When you leave the scope, you can be sure that you haven't leaked any coroutines.

此外,由于coroutineScope将等待所有子协程完成,因此您可以放弃jobs.joinAll()调用.

Also, because coroutineScope will wait until all child-coroutines are done, you can drop the jobs.joinAll() call.

旁注: 编写启动一些协程的函数时的惯例是,让调用者使用接收方参数来确定协程范围.使用onMessage函数执行此操作可能看起来像这样:

Side note: A convention when writing a function that start some coroutines, is to let the caller decide the coroutine scope using the receiver parameter. Doing this with the onMessage function could look like this:

fun CoroutineScope.onMessage(msg: Message): List<Job> {
    val ids: List<Int> = msg.getIds()    

    return ids.map { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

这篇关于为什么不使用GlobalScope.launch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆