在流中链接Akka-http-client请求 [英] Chain Akka-http-client requests in a Stream

查看:80
本文介绍了在流中链接Akka-http-client请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用akka-http-client作为Stream链接http请求。链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求。如果请求不成功,则Stream应该返回失败请求的响应。

I would like to chain http request using akka-http-client as Stream. Each http request in a chain depends on a success/response of a previous requests and uses it to construct a new request. If a request is not successful, the Stream should return the response of the unsuccessful request.

如何在akka-http中构造这样的流?
我应该使用哪个akka-http客户端级别的API?

How can I construct such a stream in akka-http? which akka-http client level API should I use?

推荐答案

如果要制作网络爬虫,请查看此帖子。此答案解决了一个更简单的情况,例如下载分页的资源,其中指向下一页的链接位于当前页面响应的标题中。

If you're making a web crawler, have a look at this post. This answer tackles a more simple case, such as downloading paginated resources, where the link to the next page is in a header of the current page response.

您可以使用 Source.unfoldAsync 方法创建一个链接的源,其中一个项目指向下一个项目。这需要一个函数,该函数采用元素 S 并返回 Future [Option [(S,E)]] 来确定如果流应继续发出 E 类型的元素,则将状态传递给下一个调用。

You can create a chained source - where one item leads to the next - using the Source.unfoldAsync method. This takes a function which takes an element S and returns Future[Option[(S, E)]] to determine if the stream should continue emitting elements of type E, passing the state to the next invocation.

在您的情况下,这类似于:

In your case, this is kind of like:


  1. 进行初始 HttpRequest

  2. 产生 Future [HttpResponse]

  3. 如果响应指向另一个URL,则返回 Some(request-> response),否则返回 None

  1. taking an initial HttpRequest
  2. producing a Future[HttpResponse]
  3. if the response points to another URL, returning Some(request -> response), otherwise None

但是,出现了皱纹,即如果出现以下情况,它不会从流中发出响应它不包含指向下一个请求的指针。

However, there's a wrinkle, which is that this will not emit a response from the stream if it doesn't contain a pointer to the next request.

要解决此问题,可以使函数传递给 unfoldAsync 返回 Future [Option [(Option [HttpRequest],HttpResponse)]] 。这使您可以处理以下情况:

To get around this, you can make the function passed to unfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]. This allows you to handle the following situations:


  • 当前响应为错误

  • 当前响应指向另一个请求

  • 当前响应未指向另一个请求

什么以下是一些带注释的代码,概述了此方法,但首先是一个初步的方法:

What follows is some annotated code which outlines this approach, but first a preliminary:

在Akka流中流式传输HTTP请求响应时,您需要确保响应主体被消耗掉,否则会发生坏事(死锁等)。如果不需要主体,则可以忽略它,但是这里我们使用函数来转换 HttpEntity 从(潜在的)流变成严格的实体:

When streaming HTTP requests to responses in Akka streams, you need to ensure that the response body is consumed otherwise bad things will happen (deadlocks and the like.) If you don't need the body you can ignore it, but here we use a function to convert the HttpEntity from a (potential) stream into a strict entity:

import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

接下来是几个创建<$ HttpResponse 中的c $ c> Option [HttpRequest] 。本示例使用类似Github的分页链接的方案,其中 Links 标头包含例如:< https://api.github.com/。 ..> rel = next

Next, a couple of functions to create an Option[HttpRequest] from an HttpResponse. This example uses a scheme like Github's pagination links, where the Links header contains, e.g: <https://api.github.com/...> rel="next":

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

接下来是真实函数我们将传递给 unfoldAsync 。它使用Akka HTTP Http()。singleRequest() API接受 HttpRequest 并生成 Future [HttpResponse]

Next, the real function we'll pass to unfoldAsync. It uses the Akka HTTP Http().singleRequest() API to take an HttpRequest and produce a Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }

请注意,如果收到错误的回复,流不会继续,因为我们在下一个状态返回 None

Note that if we get an errored response, the stream will not continue since we return None in the next state.

您可以调用此函数以获取 HttpResponse 对象如下:

You can invoke this to get a stream of HttpResponse objects like so:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

关于返回上一个(或错误的)响应的值,您只需要使用 Sink.last ,因为该流将在成功完成时或在第一个错误响应时结束。例如:

As for returning the value of the last (or errored) response, you simply need to use Sink.last, since the stream will end either when it completes successfully or on the first errored response. For example:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)

这篇关于在流中链接Akka-http-client请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆