Akka流+ Akka Http-获取错误请求 [英] Akka Stream + Akka Http - Get Request on Error

查看:65
本文介绍了Akka流+ Akka Http-获取错误请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下运行良好的流:

I have the following stream that works pretty well:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

和一个简单的actor在流完成时处理响应状态和最终消息:

and a simple actor to handle the response status and the final message when the stream completes:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

我不知道这是否是计算事物和处理响应状态的最佳方法,但到目前为止,它仍然有效

I don't know if this is the best approach to count things and also to handle response status, but it's working so far.

问题是如何以某种方式将原始请求传递给Actor,我可以知道是哪个请求导致了特定错误。

The question is how to pass the original request to the actor in a way I could know what request caused a specific error.

我的演员可能期望以下内容代替:

My actor could expect the following instead:

case (request: String, response: HttpResponse) =>

但是如何传递管道初期的信息呢?

But how to pass that information that I have at the beginning of my pipeline?

我正在想像这样的地图

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

但是我不知道如何触发Http流。

But I have no idea on how to fire the Http flow.

有任何建议吗?

推荐答案

在@cmbaxter帮助下,我可以使用以下代码来解决我的问题:

With @cmbaxter help, I could solve my problem using the following piece of code:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

现在我的演员已经可以接收到:

Now my actor is able to receive this:

case (respTry: Try[HttpResponse], request: String) =>

这篇关于Akka流+ Akka Http-获取错误请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆