如何限制通过 Monix 发送 HTTP get 请求? [英] How can I throttle sending HTTP get requests via Monix?

查看:73
本文介绍了如何限制通过 Monix 发送 HTTP get 请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基于我之前的问题 以及来自 Artem 的见解,我的目标是将获取请求发送到给定的 url,并使用 Monix 的节流功能以隔离请求(以避免达到速率限制).

Build on my earlier question, and with insights from Artem, my objective is to send get requests to a given url, and use Monix's throttling feature to space out the requests (to avoid hitting rate limits).

预期的工作流程类似于:

The expected workflow looks something like:

make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..

这是我到目前为止所尝试的(以下是我实际代码的简化片段):

This is what I have tried so far (below is a simplified snippet of my actual code):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close())
  }

  import monix.execution.Scheduler.Implicits.global

  val flat: Unit = activities.runToFuture.foreach { x =>
    val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
    Observable
      .fromIterable(r)
      .throttle(6 second, 1)
      .map(_.runToFuture)
      .subscribe()
  }
  while (true) {}
}
  

这是获取数据的函数的样子:

And this is how the function for fetching the data looks like:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

我已经尝试运行上述代码,但我仍然看到所有 get 请求都被触发,中间没有任何间距.

I have tried running the aforementioned code and I still see that all the get requests are fired without any spacing in between.

为了说明,我当前的 api 调用日志类似于:

For illustration, my current api call logs look something like:

//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)

我正在努力实现类似于:

And I am trying to achieve something similar to:

//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)

更新:

  • 我已使用 Beeceptor 将 api 设置为可模拟.在我看来,打印语句是从调用函数发出的,但实际上并未发送请求.我还更新了我的函数调用以解析为字符串(只是为了简单起见)但是,当我尝试限制对模拟 api 的请求时,它仍然没有收到任何请求.
  • I have set up the api to be mockable using Beeceptor. And it seems to me that the print statements are made from the calling function but the requests are not actually sent. I have also updated my function call to be parsing as a string (just for simplicity)However, when I try to throttle the request to the mock api it still does not receive any requests.

推荐答案

所以如果我理解正确的话,你有这样的类型:

So if i have understood right you have types like this:

object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activities]
  case class Activities()
  val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = ???
    val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
    val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
    val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
    activitiesData.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global
  Observable(activities)
    .throttle(3 second, 1)
    .subscribe()
}

您的代码中的问题是您限制了一个包含多个动作的大任务,其中一些动作甚至是并行的(但这不是问题的根源).即使在类型中,您也可以看到 - 您应该从任务列表中进行观察(每个任务都会受到限制),而不是列表中的任务.

The problem in your code that you throttle the one big Task that contains multiple actions, some of them even parallel (but that not is the root of the problem). Even in types, you can see that - you should make observable from a list of tasks (each of them would be throttled), not the task of the list.

我实际上不知道 id 来自哪里,它可以成为评估管道的基石.但是,如果我们像示例中那样与他们一起完成任务.我们会这样做.

I actually don't know where ids come from and it can be the cornerstone of the evaluation pipeline. But if we have the task with them like in the example. We will do this.

import monix.eval.Task
import sttp.client.asynchttpclient.monix._
import monix.eval.Task._
import monix.reactive.Observable
import sttp.client.ResponseError

import scala.concurrent.duration.DurationInt

object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activity]
  case class Activity()
  val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = Task {
      println("mocked http request")
      Right(Activity())
    }
    val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
    val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
    data.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global

  Observable.fromTask(activities)
    .flatMap { listOfFetches: List[Task[Response]]  =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3.second, 1)
    .map(_.runToFuture) 
    .subscribe()
  
  while(true) {}
}

我们限制提取列表,而不是在内部执行所有提取的任务.

We throttle a list of fetches, not the task that does all fetches inside.

PS:有不清楚的地方请提问,我会在代码中添加注释

PS: Please ask questions what is unclear, I will add comments to the code

这篇关于如何限制通过 Monix 发送 HTTP get 请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆