Kotlin协程-在运行阻塞中使用主线程 [英] kotlin coroutines - use main thread in run blocking

查看:2092
本文介绍了Kotlin协程-在运行阻塞中使用主线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试执行以下代码:

I am trying to execute following code:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }

其中jobs是某些Supplier的列表-在同步世界中,这应该仅创建例如int列表. 一切正常,但问题是未使用主线程. YourKit的Bellow屏幕截图:

where jobs is the list of some Suppliers - in synchronus world this should just create, for example, list of ints. Everything works fine, but the problem is the main thread is not utilized. Bellow screenshot from YourKit:

我想这里是runBlocking的问题,但是还有其他方法可以接收相同的结果吗?使用Java并行流,它看起来要好得多,但是主线程仍然没有被完全利用(任务是完全独立的).

I suppose runBlocking is the problem here, but is there other way to receive the same result? With Java parallel stream it looks far more better, but the main thread is still not utilized entirely (tasks are totally independent).

好吧,也许我告诉你的东西太少了. 在观看Vankant Subramaniam的演讲后不久,我的问题就来了: https://youtu.be/0hQvWIdwnw4 . 我需要最高的性能,没有IO,没有Ui等.只有计算.只有请求,我需要使用所有可用资源.

Ok, maybe I have told you too few things. My questions came some time after watching Vankant Subramaniam presentation : https://youtu.be/0hQvWIdwnw4. I need maximum performance, there is no IO, no Ui etc. Only computations. There is only request and I need to use all my available resources.

我认为必须将paralleizm设置为线程数+ 1,但是我认为这很愚蠢.

One think which I have is to set paralleizm to thread count + 1, but I think it is rather silly.

推荐答案

我使用Java 8并行流测试了该解决方案:

I tested the solution with Java 8 parallel streams:

jobs.parallelStream().forEach { it.execute() }

我发现CPU利用率可靠地达到了100%.作为参考,我使用了此计算作业:

I found the CPU utilization to be reliably on 100%. For reference, I used this computation job:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(1_000_000)).forEach { _ ->
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}

请注意,其持续时间从零到执行100,000,000 FP乘法所需的时间随机变化.

Note that its duration varies randomly from zero up to the time it takes to perform 100,000,000 FP multiplications.

出于好奇,我还研究了您添加到问题中的代码,作为适合您的解决方案.我发现它有很多问题,例如:

Out of curiosity I also studied the code you added to your question as the solution that works for you. I found a number of issues with it, such as:

  • 将所有结果累积到一个列表中,而不是在可用时对其进行处理
  • 提交最后一份工作后立即关闭结果渠道,而不是等待所有结果

我编写了一些自己的代码,并添加了一些代码以对Stream API进行单行基准测试.在这里:

I wrote some code of my own and added code to benchmark the Stream API one-liner against it. Here it is:

const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }


fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
        GlobalScope.actor<MyJob>(Dispatchers.Default) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}

这是我的典型结果:

Parallel Stream took 396.85 ms
Channels took 398.1 ms

结果相似,但是一行代码仍然胜过50行代码:)

The results are similar, but one line of code still beats 50 lines of code :)

这篇关于Kotlin协程-在运行阻塞中使用主线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆