使用 Retrofit、Coroutines 和 Suspend 函数的并行请求 [英] Parallel request with Retrofit, Coroutines and Suspend functions

查看:46
本文介绍了使用 Retrofit、Coroutines 和 Suspend 函数的并行请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Retrofit 来发出一些网络请求.我还将协程与暂停"功能结合使用.

I'm using Retrofit in order to make some network requests. I'm also using the Coroutines in combination with 'suspend' functions.

我的问题是:有没有办法改进以下代码.这个想法是并行启动多个请求并等待它们全部完成,然后再继续该功能.

My question is: Is there a way to improve the following code. The idea is to launch multiple requests in parallels and wait for them all to finish before continuing the function.

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(注意MyService.getItem()"是一个暂停"函数.)

(Note that "MyService.getItem()" is a 'suspend' function.)

我想在这种情况下有比 foreach 更好的东西.

I guess that there is something nicer than a foreach in this case.

有人有想法吗?

推荐答案

我准备了三种方法来解决这个问题,从最简单到最正确的一个.为了简化方法的介绍,我提取了这个通用代码:

I've prepared three approaches to solving this, from the simplest to the most correct one. To simplify the presentation of the approaches, I have extracted this common code:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

在我继续之前,请注意:您的 getItem() 函数是可挂起的,您无需将其提交给 IO 调度程序.您所有的协程都可以在主线程上运行.

Before I go on, a general note: your getItem() function is suspendable, you have no need to submit it to the IO dispatcher. All your coroutines can run on the main thread.

现在让我们看看如何实现fetchItems(itemIds).

Now let's see how we can implement fetchItems(itemIds).

这里我们利用了所有协程代码都可以在主线程上运行的事实:

Here we take advantage of the fact that all the coroutine code can run on the main thread:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope 将等待您在其中 启动 的所有协程.即使它们彼此并发运行,已启动的协程仍会分派到单个(主)线程,因此从每个协程更新映射不存在并发问题.

coroutineScope will wait for all the coroutines you launch inside it. Even though they all run concurrently to each other, the launched coroutines still dispatch to the single (main) thread, so there is no concurrency issue with updating the map from each of them.

它利用单线程上下文的特性这一事实可以被视为第一种方法的局限性:它不能推广到基于线程池的上下文.我们可以通过依赖 async-await 机制来避免这种限制:

The fact that it leverages the properties of a single-threaded context can be seen as a limitation of the first approach: it doesn't generalize to threadpool-based contexts. We can avoid this limitation by relying on the async-await mechanism:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

这里我们依赖于 Collection.map() 的两个不明显的属性:

Here we rely on two non-obvious properties of Collection.map():

  1. 它急切地执行所有转换,因此在进入第二阶段之前完全完成了对 Deferred 集合的第一次转换,在那里我们等待所有
  2. 它是一个内联函数,它允许我们在其中编写可挂起的代码,即使该函数本身不是 suspend fun 并获得不可挂起的 lambda (Deferred;) ->.
  1. It performs all the transformation eagerly, so the first transformation to a collection of Deferred<Pair<Long, Item>> is completely done before entering the second stage, where we await on all of them.
  2. It is an inline function, which allows us to write suspendable code in it even though the function itself is not a suspend fun and gets a non-suspendable lambda (Deferred<T>) -> T.

这意味着所有的获取都是并发完成的,但是地图是在一个协程中组装的.

This means that all the fetching is done concurrently, but the map gets assembled in a single coroutine.

以上为我们解决了并发问题,但是没有任何背压.如果您的输入列表非常大,您需要限制同时发出的网络请求数.

The above solved the concurrency for us, but it lacks any backpressure. If your input list is very large, you'll want to put a limit on how many simultaneous network requests you're making.

您可以使用基于 Flow 的习语来做到这一点:

You can do this with a Flow-based idiom:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

这里的魔法在于 .flatMapMerge 操作.你给它一个函数 (T) ->Flow<R> 它将在所有输入上顺序执行它,但它会同时收集它获得的所有流.请注意,我无法将 flow { emit(getItem()) } } 简化为 flowOf(getItem()) 因为 getItem()必须懒惰地调用,同时收集流.

Here the magic is in the .flatMapMerge operation. You give it a function (T) -> Flow<R> and it will execute it sequentially on all the input, but then it will concurrently collect all the flows it got. Note that I couldn't simplify flow { emit(getItem()) } } to just flowOf(getItem()) because getItem() must be called lazily, while collecting the flow.

Flow.toMap() 目前没有在标准库中提供,所以这里是:

Flow.toMap() is not currently provided in the standard library, so here it is:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}

这篇关于使用 Retrofit、Coroutines 和 Suspend 函数的并行请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆