使用源队列实现线程在akka-http中进行连接池安全吗? [英] Is connection pooling in akka-http using the source queue Implementation thread safe?

查看:114
本文介绍了使用源队列实现线程在akka-http中进行连接池安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请参阅以下提到的实现:

Refering to the following implementation mentioned in:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host -level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

从多个线程提供队列http请求是否安全?
如果不是,那么实现此要求的最佳方法是什么?

Is it thread safe to offer the queue http requests from multiple threads ? If it isn't, what is the best way to implement such requirement ? using a dedicated actor perhaps ?

推荐答案

如@ frederic-a所述, SourceQueue 不是线程安全的解决方案。

As correctly stated by @frederic-a, SourceQueue is not a thread safe solution.

也许合适的解决方案是使用 MergeHub (有关更多详细信息,请参见 docs ) 。这有效地使您可以在两个阶段中运行图形。

Perhaps a fit solution would be to use a MergeHub(see docs for more details). This effectively allows you to run your graph in two stages.


  1. 从集线器到接收器(这会变成接收器)

  2. 将第1点实现的接收器分配给您的用户。 Sink 实际上是设计为可分发的,因此这是绝对安全的。

  1. from your hub to your sink (this materializes to a sink)
  2. distribute the sink materialized at point 1 to your users. Sinks are actually designed to be distributed, so this is perfectly safe.

根据 MergeHub 行为,该解决方案在背压方面是安全的。 / p>

This solution would be safe backpressure-wise, as per MergeHub behaviour


如果消费者无法跟上,那么所有生产者都会承受
的背压。

If the consumer cannot keep up then all of the producers are backpressured.

下面的代码示例:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p))    => p.failure(e)
  }))(Keep.left)
  .run()

// on the user threads

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)

这篇关于使用源队列实现线程在akka-http中进行连接池安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆