在流中链接 Akka-http-client 请求 [英] Chain Akka-http-client requests in a Stream

查看:31
本文介绍了在流中链接 Akka-http-client 请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 akka-http-client 作为 Stream 链接 http 请求.链中的每个 http 请求都依赖于先前请求的成功/响应,并使用它来构建新的请求.如果请求不成功,Stream 应该返回不成功请求的响应.

I would like to chain http request using akka-http-client as Stream. Each http request in a chain depends on a success/response of a previous requests and uses it to construct a new request. If a request is not successful, the Stream should return the response of the unsuccessful request.

如何在 akka-http 中构建这样的流?我应该使用哪个 akka-http 客户端级 API?

How can I construct such a stream in akka-http? which akka-http client level API should I use?

推荐答案

如果您正在制作网络爬虫,请查看 这篇文章.该答案解决了更简单的情况,例如下载分页资源,其中下一页的链接位于当前页面响应的标题中.

If you're making a web crawler, have a look at this post. This answer tackles a more simple case, such as downloading paginated resources, where the link to the next page is in a header of the current page response.

您可以使用 Source.unfoldAsync 方法创建一个链接的源 - 一个项目通向下一个项目.这需要一个函数,它接受一个元素 S 并返回 Future[Option[(S, E)]] 以确定流是否应该继续发射 类型的元素E,将状态传递给下一次调用.

You can create a chained source - where one item leads to the next - using the Source.unfoldAsync method. This takes a function which takes an element S and returns Future[Option[(S, E)]] to determine if the stream should continue emitting elements of type E, passing the state to the next invocation.

就你而言,这有点像:

  1. 获取初始HttpRequest
  2. 生成Future[HttpResponse]
  3. 如果响应指向另一个URL,返回Some(request -> response),否则返回None

然而,有一个问题,那就是如果它不包含指向下一个请求的指针,它不会从流中发出响应.

However, there's a wrinkle, which is that this will not emit a response from the stream if it doesn't contain a pointer to the next request.

为了解决这个问题,你可以让传递给 unfoldAsync 的函数返回 Future[Option[(Option[HttpRequest], HttpResponse)]].这允许您处理以下情况:

To get around this, you can make the function passed to unfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]. This allows you to handle the following situations:

  • 当前响应是一个错误
  • 当前响应指向另一个请求
  • 当前响应未指向另一个请求

接下来是一些带注释的代码,概述了这种方法,但首先是初步的:

What follows is some annotated code which outlines this approach, but first a preliminary:

在将 HTTP 请求流式传输到 Akka 流中的响应时,您需要确保响应正文被消耗,否则会发生不好的事情(死锁等).如果您不需要正文,则可以忽略它,但这里我们使用一个函数将 HttpEntity 从(潜在)流转换为严格实体:

When streaming HTTP requests to responses in Akka streams, you need to ensure that the response body is consumed otherwise bad things will happen (deadlocks and the like.) If you don't need the body you can ignore it, but here we use a function to convert the HttpEntity from a (potential) stream into a strict entity:

import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

接下来,使用几个函数从 HttpResponse 创建一个 Option[HttpRequest].这个例子使用了类似于 Github 的分页链接的方案,其中 Links 标头包含,例如:<https://api.github.com/...>rel="next":

Next, a couple of functions to create an Option[HttpRequest] from an HttpResponse. This example uses a scheme like Github's pagination links, where the Links header contains, e.g: <https://api.github.com/...> rel="next":

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

接下来,我们将传递给 unfoldAsync 的真正函数.它使用 Akka HTTP Http().singleRequest() API 来获取 HttpRequest 并生成一个 Future[HttpResponse]:

Next, the real function we'll pass to unfoldAsync. It uses the Akka HTTP Http().singleRequest() API to take an HttpRequest and produce a Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }

注意,如果我们得到一个错误的响应,流将不会继续,因为我们在下一个状态返回 None.

Note that if we get an errored response, the stream will not continue since we return None in the next state.

您可以调用它来获取 HttpResponse 对象的流,如下所示:

You can invoke this to get a stream of HttpResponse objects like so:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

至于返回最后(或错误)响应的值,您只需使用 Sink.last,因为流将在成功完成或第一个错误响应时结束.例如:

As for returning the value of the last (or errored) response, you simply need to use Sink.last, since the stream will end either when it completes successfully or on the first errored response. For example:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)

这篇关于在流中链接 Akka-http-client 请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆