Akka-http在一个流程中处理来自不同连接的HttpRequest [英] Akka-http process HttpRequests from different connections in one flow

查看:70
本文介绍了Akka-http在一个流程中处理来自不同连接的HttpRequest的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Akka-http 文档说:

Akka-http documentation says:


除了将服务器端的套接字视为
Source [IncomingConnection],并将每个连接作为
Source [HttpRequest]和接收器[HttpResponse]

Apart from regarding a socket bound on the server-side as a Source[IncomingConnection] and each connection as a Source[HttpRequest] with a Sink[HttpResponse]

假设我们从多个Source [IncomingConnection]中获取包含传入连接的合并源。

Assume we get the merged source containing incoming connections from many Source[IncomingConnection].

然后,假设我们从Source [IncomingConnection]中获取Source [HttpRequest](请参见

Then, assume, we get Source[HttpRequest] from Source[IncomingConnection] (see the code below).

然后,没问题,我们可以提供将HttpRequest转换为HttpResponse的流程。

Then, no problem, we can provide a flow to convert HttpRequest to HttpResponse.

这是问题所在-我们如何正确接收响应?我们如何才能加入对连接的响应?

And here is the problem - how can we properly Sink the responses ? How can we join the responses to connections?

用例背后的全部思想是可以优先处理来自不同连接的传入请求。我想在许多情况下应该有用...

The whole idea behind the use case is the possibility to prioritize the processing of incoming requests from different connections. Should be useful in many cases I guess...

预先感谢!

编辑:
基于@RamonJRomeroyVigil的答案的解决方案:

Solution based on the answer from @RamonJRomeroyVigil:

服务器代码:

val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)

val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
  import FlowGraph.Implicits._

  val merge = b.add(Merge[IncomingConnection](2))

  in1 ~> print("in1") ~> merge.in(0)
  in2 ~> print("in2") ~> merge.in(1)

  SourceShape(merge.out)
})

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
      .via(conn.flow)
      .map(request => (request, conn))
  }

val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
  Flow[(HttpRequest, IncomingConnection)].map{
      case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
        println(s"${System.currentTimeMillis()}: " +
          s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
        (HttpResponse(entity = "pong"), conn)
    }

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()

def print(prefix: String) = Flow[IncomingConnection].map { s =>
  println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}

因此,我从控制台使用curl并看到以下内容:

So, I am using curl from console and see the following:

% curl http://localhost:8200/ping
curl: (52) Empty reply from server

第二次卷曲请求失败:

% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused

在服务器控制台上,发送第一个请求时,我看到以下内容:

On the server console I see the following when sending 1st request:

in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

发送第二个请求时什么也没有。

And nothing when sending 2nd request.

因此,内部连接流(如@RamonJRomeroyVigil所述)或其他问题似乎存在问题。

So, it looks like there is some problem with the internal connection flow (as stated @RamonJRomeroyVigil) or with something else...

基本上,代码无法正常工作。

Basically the code does not work.

仍在调查问题。

推荐答案

以下解决方案基于有关问题注释中提供的更多信息。

The below solution is based on further information provided in the question comments.

给出

val connSrc : Source[IncomingConnection,_] = ???

flatMapConcat 方法解决了所述特定问题:

The flatMapConcat method solves the specific question stated:

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
          .via(conn.flow)
          .map(request => (request, conn))
  }

这提供了(HttpRequest,IncomingConnection)元组的源。

假设您有一个将请求转换为响应的处理步骤

Assuming you have a processing step that converts requests to respones

val flow : Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] = ???

您可以将响应发送回客户端:

You can send a response back to the client:

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
})

警告:此解决方案调用 conn.flow 两次:一次创建一个生成请求的流,再一次创建一个发送响应的流。我不知道这种用例是否会破坏 IncomingConnection 逻辑中的某些内容。

Warning: This solution calls conn.flow twice: once to create a flow that generates requests and again to create a flow to send responses to. I do not know if this type of use-case will break something within the IncomingConnection logic.

这篇关于Akka-http在一个流程中处理来自不同连接的HttpRequest的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆