Kotlin-如何运行n个协程并等待前m个结果或超时? [英] Kotlin - How to run n coroutines and wait for first m results or timeout?

查看:87
本文介绍了Kotlin-如何运行n个协程并等待前m个结果或超时?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个将启动n个协程并等待第一个m完成的函数.如果m个协程在某个超时时间内无法完成,则所有协程/作业都将被取消.我的初始实现如下所示,但是我觉得可以改进.我最初的想法是使用父作业在其下运行所有​​其他作业,以便可以取消父作业并将其级联到剩余的子级.但是,这导致必须捕获TimeoutCancellationException.

I'm attempting to write a function that will start n coroutines and wait until the first m to complete. Should m coroutines fail to complete within some timeout, then all coroutines/jobs are canceled. My initial implementation for this is shown below, however I feel it can be improved. My initial thought was to use a parent job to run all other jobs under so the parent job can be cancelled and cascade down to the remaining children. However, this results in a TimeoutCancellationException that has to be caught.

我该如何编写一个函数来启动n个协程并等待第一个m协程完成,或者在m个协程可以完成之前发生超时?

private suspend fun queryAllHosts(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , maxSuccessfulHosts: Int
        , queryTimeout: Long
        , requestTimeout: Long
): ArrayList<QueryResult<ResultModel>> {
    val results = ArrayList<QueryResult<ResultModel>>()
    val rootJob = Job()

    try {
        withTimeout(queryTimeout, TimeUnit.MILLISECONDS) {
            queryFactories.map {
                async(parent = rootJob) {
                    val pagedResult = queryHost(
                            it
                            , query
                            , pageIndex
                            , requestTimeout
                    )

                    if (pagedResult.isSuccessful()) {
                        results.add(pagedResult)
                    }

                    if (results.size == maxSuccessfulHosts) {
                        rootJob.cancelAndJoin()
                        return@async
                    }
                }
            }.awaitAll()
        }
    } catch (ex: TimeoutCancellationException) {
        Log.w(Tag, "Query timed out, successful queries: ${results.size}")
    } catch (ignored: JobCancellationException) {
        // Ignored
    } catch (ex: Exception) {
        Log.w(Tag, "Unexpected exception", ex)
    }

    return results
}

更新

由于并发修改异常,我对先前接受的答案没有任何运气.以下是对原始答案的一些细微调整,以避免并发修改异常,但是它没有考虑到代码超时.

I didn't have any luck with the previously accepted answer due to concurrent modification exceptions. Below is a slight tweak on the original answer to avoid the concurrent modification exception, however it fails to respect the ticker timeout.

如何避免并发修改异常并仍然遵守代码超时?

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = HashSet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticker = ticker(timeoutMs)

    forEach { deferred ->
        deferred.invokeOnCompletion {
            if (!deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed) Value: ${deferred.getCompleted()}")
            } else {
                Log.d(Tag, "(Completed) Exception: $it")
            }
        }
    }

    val processed = HashSet<Deferred<T>>()

    val elapsedTime = measureTimeMillis {
        whileSelect {
            toAwait.minus(processed).forEach { deferred ->
                processed.add(deferred)
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }

            ticker.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

现在使用以下代码创建延迟的实例:

Deferred instances are created with the following code now:

private fun makeRequest(
        url: String
        , timeoutMs: Int
): Document? = try {
    Jsoup.connect(url).timeout(timeoutMs).get()
} catch (ex: Exception) {
    null
}

private fun createAsyncRequests(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , timeoutMs: Int
): List<Deferred<QueryResult<TorrentResult>>> = queryFactories.map { queryFactory ->
    async(start = CoroutineStart.LAZY) {
        try {
            val url = queryFactory(query, pageIndex)
            makeRequest(
                    url
                    , timeoutMs
            ).getQueryResult(pageIndex, url)
        } catch (ex: Exception) {
            QueryResult<TorrentResult>(state = QueryResult.State.ERROR)
        }
    }
}

更新

下面的日志显示提供的2,000ms timoutMs不被接受,因为超时发生在11,861ms:

Log below show supplied timoutMs of 2,000ms not being respected as timeout occurs at 11,861ms:

2018-09-13 22:39:00.307 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.315 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.454 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|1/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://indiapirate.com/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.455 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.456 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.470 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|2/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.471 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.472 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.479 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|3/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|4/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.482 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.500 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|5/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.be/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|6/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay.nz/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.578 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|7/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay6.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.603 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|8/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.605 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|9/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://uktpb.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.622 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.694 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|10/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxyproxy.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.712 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|11/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://fastpirate.link/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|12/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freepirate.eu/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.480 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|13/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://pirate.tel/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.481 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.482 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|14/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratesbay.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.650 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.780 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|15/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.781 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.782 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.131 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|16/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.250 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|17/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.251 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.253 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|18/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freeproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.297 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.441 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|19/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.442 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.443 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:04.826 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|20/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:04.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|21/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:05.325 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|22/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:05.926 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|23/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.002 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|24/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.117 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|25/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:06.338 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|26/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.923 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|27/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebay.red/search/hobbit+1977/0/7
2018-09-13 22:39:07.214 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|28/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:07.625 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|29/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateway.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.398 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|30/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:08.431 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|31/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.blue/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.684 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|32/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepiratepirate.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.033 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|33/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://unblocktpb.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.759 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|34/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:09.879 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|35/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbunblock.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.961 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|36/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.554 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|37/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:10.715 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|38/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://Piratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.855 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|39/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:11.014 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|40/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.fun/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.298 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|41/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.313 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|42/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.review/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.932 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|43/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.life/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.166 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|44/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.one/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.168 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Elapsed time: 11861
2018-09-13 22:39:12.885 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|45/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@b129740
2018-09-13 22:39:13.437 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|46/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c801679
2018-09-13 22:39:14.051 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|47/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@3f89fbe
2018-09-13 22:39:14.154 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|48/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c7c181f
2018-09-13 22:39:14.658 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|49/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@729e76c
2018-09-13 22:39:17.334 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|50/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@ae2a135

使用日志记录更新了代码(我删除了TimeoutException并返回了false:)

Updated code with logging (I removed the TimeoutException and returned false instead):

suspend fun <T> List<Deferred<T>>.awaitCount(
        count: Int
        , timeoutMs: Long
): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)
    var completedCount = 0

    forEach { deferred ->
        deferred.invokeOnCompletion {
            completedCount++

            if (deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed|$completedCount/$size) Exception: $it")
            } else {
                Log.d(Tag, "(Completed|$completedCount/$size) Value: ${deferred.getCompleted()}")
            }
        }
    }

    val elapsedTime = measureTimeMillis {
        whileSelect {
            ticket.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }

            toAwait.forEach { deferred ->
                Log.d(Tag, "Starting deferred..")
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

推荐答案

可以完全避免使用其他启动的任务和根作业来进行改进.

It can be improved by avoiding using additional launched task and root job at all.

kotlinx.coroutines 具有用于此类复杂运算符的 select 子句,非常适合您的用例.而且,很容易概括一下:

kotlinx.coroutines has select clause for such complex operators, which perfectly fits your use-case. Moreover, it's easy to generalize:

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)

    whileSelect {
        toAwait.forEach { deferred ->
            deferred.onAwait {
                toAwait.remove(deferred)
                result.add(it)
                result.size != count
            }
        }

        ticket.onReceive {
            val e = TimeoutException()
            toAwait.forEach { it.cancel(e) }
            throw e
        }
    }

    return result
}

然后您可以在 queryAllHosts 中使用它:

Then you can use it in queryAllHosts:

val queries = queryFactories.map { queryHost(...) }
return queries.awaitCount(maxSuccessfulHosts, queryTimeout)

您可以根据需要调整 awaitCount ,例如将其专门用于翻新并添加 isSuccessful 检查

You can adjust awaitCount in the way you want, e.g. specialize it for Retrofit and add isSuccessful check

这篇关于Kotlin-如何运行n个协程并等待前m个结果或超时?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆