Akka Stream + Akka Http - 获取错误请求 [英] Akka Stream + Akka Http - Get Request on Error

查看:34
本文介绍了Akka Stream + Akka Http - 获取错误请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下流,效果很好:

I have the following stream that works pretty well:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

还有一个简单的actor来处理流完成时的响应状态和最终消息:

and a simple actor to handle the response status and the final message when the stream completes:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

我不知道这是否是计算事物和处理响应状态的最佳方法,但到目前为止它是有效的.

I don't know if this is the best approach to count things and also to handle response status, but it's working so far.

问题是如何以一种我可以知道是什么请求导致特定错误的方式将原始请求传递给参与者.

The question is how to pass the original request to the actor in a way I could know what request caused a specific error.

我的演员可以期待以下内容:

My actor could expect the following instead:

case (request: String, response: HttpResponse) =>

但是如何传递我在管道开始时拥有的信息?

But how to pass that information that I have at the beginning of my pipeline?

我想像这样map:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

但我不知道如何触发 Http 流.

But I have no idea on how to fire the Http flow.

有什么建议吗?

推荐答案

在@cmbaxter 的帮助下,我可以使用以下代码解决我的问题:

With @cmbaxter help, I could solve my problem using the following piece of code:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

现在我的演员能够收到这个:

Now my actor is able to receive this:

case (respTry: Try[HttpResponse], request: String) =>

这篇关于Akka Stream + Akka Http - 获取错误请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆