使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取 [英] Get whole HttpResponse body as a String with Akka-Streams HTTP

查看:30
本文介绍了使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试了解如何使用新的 akka.http 库.我想向服务器发送一个 http 请求并将整个响应正文作为单个字符串读取,以生成一个 Source[String,?].

I'm trying to understand how to use the new akka.http library. I would like to send an http request to a server and read the whole response body as a single String in order to produce a Source[String,?].

这是迄今为止我能够产生的最佳解决方案:

Here is the best solution I was able to produce so far:

 def get(
   modelID: String,
   pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
 ): Source[String,Unit] = {
   val uri = reactionsURL(modelID)
   val req = HttpRequest(uri = uri)
   Source.single( (req,0) )
     .via( pool )
     .map { 
       case (Success(resp),_) =>
         resp.entity.dataBytes.map( _.decodeString("utf-8") )
     }.flatten(FlattenStrategy.concat)
     .grouped( 1024 )
     .map( _.mkString )

它似乎工作得很好(除了缺少错误路径),但对于这样简单的任务来说有点笨重.有更聪明的解决方案吗?我可以避免 grouped/mkString 吗?

It seems to work well (except the missing error path), but it is a bit clunky for such simple tasks. Is there a smarter solution ? Can I avoid the grouped/mkString ?

推荐答案

可以使用HttpResponse 超时.它将整个答案收集为 Future.

You can use toStrict method of HttpResponse with timeout. It gathers whole answer as Future.

def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Strict] Returns a sharable and serializable

def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Strict] Returns a sharable and serializable

带有严格实体的此消息的副本.

copy of this message with a strict entity.

示例:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Sink, Flow, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

import scala.util.{Try, Success}

object Main extends App {

  implicit val system = ActorSystem()

  import system.dispatcher

  implicit val materializer = ActorMaterializer()

  val host = "127.0.0.1"
  lazy val pool = Http().newHostConnectionPool[Int](host, 9000)

  FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run()

}

object FlowBuilder {
  def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool])
         (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = {
    val uri = modelID
    val req = HttpRequest(uri = modelID)
    Source.single((req, 0)).via(pool)
      .map { 
      case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
    }
  }
}

这篇关于使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆