akka-stream相关内容

如何使用反压溢出策略创建Akka-Http客户端?

我有未定数量的Akka-http客户端流从http服务下载数据。我使用Akka-http主机级连接池,因为我希望自定义池,因为有长时间运行的请求通过它。 因为客户端的数量是未定义的和动态的,所以我不知道如何配置连接池(max-open-request/max-connections)。此外,我可能希望连接池较小(少于客户端数量),以不损害带宽。 因此,我想设置一个客户端流,以便对池的新 ..
发布时间:2022-02-27 18:51:36 其他开发

Akka Stream Source.queue 的背压策略不起作用

我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后. 片段: 打包游乐场导入 java.time.LocalDateTime导入 java.util.concurrent.atomic.AtomicInteger导入 akka.actor. ..
发布时间:2021-11-26 22:51:37 其他开发

如何在 Akka Stream 中记录流速?

我有一个带有单个流/图的 Akka Stream 应用程序.我想测量源头的流速并每 5 秒记录一次,例如“在过去 5 秒内收到 3 条消息".我试过了, someOtherFlow.groupedWithin(Integer.MAX_VALUE, 5 秒).runForeach(seq =>log.debug(s"在过去 5 秒内收到 ${seq.length} 条消息")) 但只在有消息时输 ..
发布时间:2021-11-26 22:51:29 其他开发

使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)

我尝试使用 Akka 2.4.3 将 TCP 流重定向/转发到另一个接收器.该程序应该打开一个服务器套接字,侦听传入的连接,然后使用 tcp 流.我们的发件人不期望/接受我们的回复,所以我们从不发回任何东西——我们只是消费流.在对 tcp 流进行成帧后,我们需要将字节转换为更有用的内容并将其发送到 Sink. 到目前为止,我尝试了以下操作,但我尤其在如何不将 tcp 数据包发送回发件人并正确 ..
发布时间:2021-11-26 22:51:18 其他开发

使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取

我正在尝试了解如何使用新的 akka.http 库.我想向服务器发送一个 http 请求并将整个响应正文作为单个字符串读取,以生成一个 Source[String,?]. 这是迄今为止我能够产生的最佳解决方案: def get(型号ID:字符串,池:流[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]): ..
发布时间:2021-11-26 22:51:07 其他开发

将 Akka Stream Source 一分为二

我有一个 Akka Streams Source,我想根据谓词将其拆分为两个源. 例如有一个来源(类型被有意简化): val source: Source[Either[Throwable, String], NotUsed] = ??? 还有两种方法: def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ..
发布时间:2021-11-26 22:50:55 其他开发

使用 Akka Streams 读取 CSV 文件

我正在读取一个 csv 文件.我正在使用 Akka Streams 来执行此操作,以便我可以创建要在每一行上执行的操作图.我已经启动并运行了以下玩具示例. def main(args: Array[String]): Unit = {隐式 val 系统 = ActorSystem("MyAkkaSystem")隐式 val materializer = ActorMaterializer()v ..
发布时间:2021-11-26 22:50:48 其他开发

如何从递归生成值的流创建 akka-stream 源?

我需要遍历一个形状像树的 API.例如,目录结构或讨论线程.它可以通过以下流程建模: type ItemId = Int类型数据 = 字符串案例类项目(数据:数据,孩子:列表[ItemId])def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString//0 =>[1, 9]//1 =>[10, 19]//2 = ..
发布时间:2021-11-26 22:50:39 其他开发

端到端反应式流 RESTful 服务(又名 Back-Pressure over HTTP)

我在网上尝试澄清这个问题有一段时间没有成功,所以我会尝试在这里提问. 我想找到一些资源或示例,其中展示了如何构建端到端的完全背压式 REST 服务 + 客户端.我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他中),我将拥有(并且能够“可视化")整个过程中处理的背压构建的 REST 服务器,例如使用 Akka-Ht ..
发布时间:2021-11-26 22:50:28 其他开发

Akka Stream - 定时器或调度程序,如 CRON

我在 Scala 上使用 Akka Stream.我想设置一个每 24:00 运行的调度程序.我试图寻找它.但我找不到我想做的事.你能告诉我怎么写代码吗? 解决方案 在 Akka scheduler 中使用构建,见:http://doc.akka.io/docs/akka/current/scala/scheduler.html 您可以像这样使用调度程序: system.sched ..
发布时间:2021-11-26 22:50:21 其他开发

Akka Stream - 根据流中的元素选择接收器

我正在使用 Akka 流创建一个简单的消息传递服务.该服务就像邮件传递一样,其中来自源的元素包括 destination 和 content,例如: case class Message(destination: String, content: String) 并且服务应该根据 destination 字段将消息传递到适当的接收器.我创建了一个 DeliverySink 类来让它有一个名字: ..
发布时间:2021-11-26 22:50:10 其他开发

使用 slick 的 3.0.0 流结果和 Postgresql 的正确方法是什么?

我想弄清楚如何使用流畅的流媒体.我使用带有 postgres 驱动程序的 slick 3.0.0 情况如下:服务器必须将客户端数据序列拆分为受大小(以字节为单位)限制的块.所以,我写了以下巧妙的查询: val 序列 = TableQuery[序列]def find(userId: Long, timestamp: Long) =sequence.filter(s ⇒ s.userId == ..
发布时间:2021-11-26 22:49:59 其他开发

Akka Stream 中的 Via/ViaMat/to/toMat

有人能解释清楚这 4 种方法有什么区别吗?什么时候使用更合适?另外一般来说这组方法的名称是什么?是否有更多方法可以完成相同的工作?scaladoc 的链接也有帮助. -D- 解决方案 所有这些方法都是将两个流合并为一个流所必需的.例如,您可以从 Source 和 Flow 中创建一个 Source,或者您可以创建一个 Sink一个Flow和一个Sink,或者你可以用两个Flow创建一 ..
发布时间:2021-11-26 22:49:41 其他开发

在条件下停止 Akka Streams 的正确方法

我已经成功使用了 FileIO 流式传输文件的内容,计算每一行的一些转换并聚合/减少结果. 现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成.实现这一目标的推荐方法是什么? 解决方案 如果停止条件是“在流的外面" 有一个名为 KillSwitch 的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/jap ..
发布时间:2021-11-26 22:49:21 其他开发

Akka 流:读取多个文件

我有一个文件列表.我要: 将所有这些作为单一来源读取. 文件应该按顺序读取.(无循环) 任何时候都不应该要求任何文件完全在内存中. 从文件读取错误应该折叠流. 感觉这应该可行:(Scala,akka-streams v2.4.7) val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromP ..
发布时间:2021-11-26 22:49:14 其他开发

如何将 Source 动态添加到现有 Graph?

有什么可以替代动态改变运行图?这是我的情况.我有将文章摄取到数据库中的图表.文章来自 3 个不同格式的插件.因此我有几个流程 val converterFlow1: Flow[ImpArticle, Article, NotUsed]val converterFlow2: Flow[NewsArticle, Article, NotUsed]val sinkDB:​​Sink[文章,未来[完成] ..
发布时间:2021-11-26 22:49:04 其他开发

基于 akka 流条件的替代流

有一个带有自定义流的流,在某个阶段我想拆分流并有两个替代数据处理,稍后将再次合并. 例如 ->F3 ->F6源 ->F1 ->F2>合并 ->下沉->F4 ->F5 F2 应该有一个条件,如果数据包含格式A,那么它应该去流程F3,否则去F4. 据我所知,每个流在每个方向上只能有一个端口(如果是双向的,则有两个)——那么我如何支持这样的流? 解决方案 您可以使用 Broad ..
发布时间:2021-11-26 22:48:55 其他开发

Akka Stream 的 Keep right/left/both 如何导致不同的输出?

我正在尝试了解 Keep 在 Akka 流中的工作方式.阅读 保持在 akka 流中意味着什么 中的答案,我明白它有助于控制我们从具体化器的左侧/右侧/两侧获得结果.但是,我仍然无法构建一个示例,因为我可以更改左/右的值并获得不同的结果. 例如 隐式val系统:ActorSystem = ActorSystem(“Playground")隐式 val 物化器:ActorMaterializ ..
发布时间:2021-11-26 22:48:43 其他开发