带有翻新,协同程序和挂起功能的并行请求 [英] Parallel request with Retrofit, Coroutines and Suspend functions

查看:144
本文介绍了带有翻新,协同程序和挂起功能的并行请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Retrofit,以便发出一些网络请求.我还将协同程序与暂停"功能结合使用.

I'm using Retrofit in order to make some network requests. I'm also using the Coroutines in combination with 'suspend' functions.

我的问题是:有没有一种方法可以改善以下代码.想法是并行启动多个请求,并等待它们全部完成,然后再继续执行该功能.

My question is: Is there a way to improve the following code. The idea is to launch multiple requests in parallels and wait for them all to finish before continuing the function.

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(请注意,"MyService.getItem()"是一个暂停"功能.)

(Note that "MyService.getItem()" is a 'suspend' function.)

我猜在这种情况下,有比 foreach 更好的东西.

I guess that there is something nicer than a foreach in this case.

有人有主意吗?

推荐答案

我准备了三种解决方法,从最简单到最正确的一种.为了简化方法的表示,我提取了以下通用代码:

I've prepared three approaches to solving this, from the simplest to the most correct one. To simplify the presentation of the approaches, I have extracted this common code:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

在继续之前,请注意以下一般事项:您的getItem()函数是可挂起的,您无需将其提交给IO调度程序.您的所有协程都可以在主线程上运行.

Before I go on, a general note: your getItem() function is suspendable, you have no need to submit it to the IO dispatcher. All your coroutines can run on the main thread.

现在让我们看看如何实现fetchItems(itemIds).

Now let's see how we can implement fetchItems(itemIds).

在这里,我们利用了所有协程代码都可以在主线程上运行的事实:

Here we take advantage of the fact that all the coroutine code can run on the main thread:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope将在其中等待您launch的所有协程.即使它们彼此并发运行,启动的协程仍将分派到单个(主)线程,因此从每个线程更新映射不会出现并发问题.

coroutineScope will wait for all the coroutines you launch inside it. Even though they all run concurrently to each other, the launched coroutines still dispatch to the single (main) thread, so there is no concurrency issue with updating the map from each of them.

它利用了单线程上下文的属性这一事实可以看作是第一种方法的局限性:它不能推广到基于线程池的上下文.我们可以依靠async-await机制来避免这种限制:

The fact that it leverages the properties of a single-threaded context can be seen as a limitation of the first approach: it doesn't generalize to threadpool-based contexts. We can avoid this limitation by relying on the async-await mechanism:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

在这里,我们依靠Collection.map()的两个非显而易见的属性:

Here we rely on two non-obvious properties of Collection.map():

  1. 它急切地执行所有转换,因此对Deferred<Pair<Long, Item>>的集合的第一次转换已完全完成,然后进入第二阶段,我们等待所有这些转换.
  2. 这是一个内联函数,即使该函数本身不是suspend fun并获得不可暂停的lambda (Deferred<T>) -> T,它也允许我们在其中编写可悬挂的代码.
  1. It performs all the transformation eagerly, so the first transformation to a collection of Deferred<Pair<Long, Item>> is completely done before entering the second stage, where we await on all of them.
  2. It is an inline function, which allows us to write suspendable code in it even though the function itself is not a suspend fun and gets a non-suspendable lambda (Deferred<T>) -> T.

这意味着所有提取操作是同时完成的,但是地图被组装在一个协程中.

This means that all the fetching is done concurrently, but the map gets assembled in a single coroutine.

以上内容为我们解决了并发问题,但它没有任何背压.如果您的输入列表很大,您将希望限制同时发出的网络请求数量.

The above solved the concurrency for us, but it lacks any backpressure. If your input list is very large, you'll want to put a limit on how many simultaneous network requests you're making.

您可以使用基于Flow的习惯用法来做到这一点:

You can do this with a Flow-based idiom:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

这是.flatMapMerge操作中的魔力.给它提供一个函数(T) -> Flow<R>,它将在所有输入上顺序执行它,但是它将并发地收集它得到的所有流.请注意,我不能将flow { emit(getItem()) } }简化为flowOf(getItem()),因为在收集流时必须懒惰地调用getItem().

Here the magic is in the .flatMapMerge operation. You give it a function (T) -> Flow<R> and it will execute it sequentially on all the input, but then it will concurrently collect all the flows it got. Note that I couldn't simplify flow { emit(getItem()) } } to just flowOf(getItem()) because getItem() must be called lazily, while collecting the flow.

Flow.toMap()当前未在标准库中提供,因此它是:

Flow.toMap() is not currently provided in the standard library, so here it is:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}

这篇关于带有翻新,协同程序和挂起功能的并行请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆