akka-stream相关内容
我有未定数量的Akka-http客户端流从http服务下载数据。我使用Akka-http主机级连接池,因为我希望自定义池,因为有长时间运行的请求通过它。 因为客户端的数量是未定义的和动态的,所以我不知道如何配置连接池(max-open-request/max-connections)。此外,我可能希望连接池较小(少于客户端数量),以不损害带宽。 因此,我想设置一个客户端流,以便对池的新
..
我正在尝试理解我在下面看到的错误,并学习如何修复它. 找不到参数物化器的隐式值:akka.Stream.Materializerval fut: Future[Result] = action.apply(fakeRequest).run^方法运行的参数不足(隐式实现器:akka.stream.Materializer)scala.concurrent.Future[play.api.mvc.
..
我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后. 片段: 打包游乐场导入 java.time.LocalDateTime导入 java.util.concurrent.atomic.AtomicInteger导入 akka.actor.
..
我有一个带有单个流/图的 Akka Stream 应用程序.我想测量源头的流速并每 5 秒记录一次,例如“在过去 5 秒内收到 3 条消息".我试过了, someOtherFlow.groupedWithin(Integer.MAX_VALUE, 5 秒).runForeach(seq =>log.debug(s"在过去 5 秒内收到 ${seq.length} 条消息")) 但只在有消息时输
..
我尝试使用 Akka 2.4.3 将 TCP 流重定向/转发到另一个接收器.该程序应该打开一个服务器套接字,侦听传入的连接,然后使用 tcp 流.我们的发件人不期望/接受我们的回复,所以我们从不发回任何东西——我们只是消费流.在对 tcp 流进行成帧后,我们需要将字节转换为更有用的内容并将其发送到 Sink. 到目前为止,我尝试了以下操作,但我尤其在如何不将 tcp 数据包发送回发件人并正确
..
我正在尝试了解如何使用新的 akka.http 库.我想向服务器发送一个 http 请求并将整个响应正文作为单个字符串读取,以生成一个 Source[String,?]. 这是迄今为止我能够产生的最佳解决方案: def get(型号ID:字符串,池:流[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]):
..
我有一个 Akka Streams Source,我想根据谓词将其拆分为两个源. 例如有一个来源(类型被有意简化): val source: Source[Either[Throwable, String], NotUsed] = ??? 还有两种方法: def handleSuccess(source: Source[String, NotUsed]): Future[Unit] =
..
我正在读取一个 csv 文件.我正在使用 Akka Streams 来执行此操作,以便我可以创建要在每一行上执行的操作图.我已经启动并运行了以下玩具示例. def main(args: Array[String]): Unit = {隐式 val 系统 = ActorSystem("MyAkkaSystem")隐式 val materializer = ActorMaterializer()v
..
我需要遍历一个形状像树的 API.例如,目录结构或讨论线程.它可以通过以下流程建模: type ItemId = Int类型数据 = 字符串案例类项目(数据:数据,孩子:列表[ItemId])def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString//0 =>[1, 9]//1 =>[10, 19]//2 =
..
我在网上尝试澄清这个问题有一段时间没有成功,所以我会尝试在这里提问. 我想找到一些资源或示例,其中展示了如何构建端到端的完全背压式 REST 服务 + 客户端.我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他中),我将拥有(并且能够“可视化")整个过程中处理的背压构建的 REST 服务器,例如使用 Akka-Ht
..
我在 Scala 上使用 Akka Stream.我想设置一个每 24:00 运行的调度程序.我试图寻找它.但我找不到我想做的事.你能告诉我怎么写代码吗? 解决方案 在 Akka scheduler 中使用构建,见:http://doc.akka.io/docs/akka/current/scala/scheduler.html 您可以像这样使用调度程序: system.sched
..
我正在使用 Akka 流创建一个简单的消息传递服务.该服务就像邮件传递一样,其中来自源的元素包括 destination 和 content,例如: case class Message(destination: String, content: String) 并且服务应该根据 destination 字段将消息传递到适当的接收器.我创建了一个 DeliverySink 类来让它有一个名字:
..
我想弄清楚如何使用流畅的流媒体.我使用带有 postgres 驱动程序的 slick 3.0.0 情况如下:服务器必须将客户端数据序列拆分为受大小(以字节为单位)限制的块.所以,我写了以下巧妙的查询: val 序列 = TableQuery[序列]def find(userId: Long, timestamp: Long) =sequence.filter(s ⇒ s.userId ==
..
为什么会出现异常 import akka.actor.ActorSystem导入 akka.stream.ActorMaterializer导入 akka.stream.scaladsl.Source对象测试异常处理{def main(args: Array[String]): Unit = {隐式 val actorSystem = ActorSystem()隐式 val materializ
..
有人能解释清楚这 4 种方法有什么区别吗?什么时候使用更合适?另外一般来说这组方法的名称是什么?是否有更多方法可以完成相同的工作?scaladoc 的链接也有帮助. -D- 解决方案 所有这些方法都是将两个流合并为一个流所必需的.例如,您可以从 Source 和 Flow 中创建一个 Source,或者您可以创建一个 Sink一个Flow和一个Sink,或者你可以用两个Flow创建一
..
我已经成功使用了 FileIO 流式传输文件的内容,计算每一行的一些转换并聚合/减少结果. 现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成.实现这一目标的推荐方法是什么? 解决方案 如果停止条件是“在流的外面" 有一个名为 KillSwitch 的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/jap
..
我有一个文件列表.我要: 将所有这些作为单一来源读取. 文件应该按顺序读取.(无循环) 任何时候都不应该要求任何文件完全在内存中. 从文件读取错误应该折叠流. 感觉这应该可行:(Scala,akka-streams v2.4.7) val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromP
..
有什么可以替代动态改变运行图?这是我的情况.我有将文章摄取到数据库中的图表.文章来自 3 个不同格式的插件.因此我有几个流程 val converterFlow1: Flow[ImpArticle, Article, NotUsed]val converterFlow2: Flow[NewsArticle, Article, NotUsed]val sinkDB:Sink[文章,未来[完成]
..
有一个带有自定义流的流,在某个阶段我想拆分流并有两个替代数据处理,稍后将再次合并. 例如 ->F3 ->F6源 ->F1 ->F2>合并 ->下沉->F4 ->F5 F2 应该有一个条件,如果数据包含格式A,那么它应该去流程F3,否则去F4. 据我所知,每个流在每个方向上只能有一个端口(如果是双向的,则有两个)——那么我如何支持这样的流? 解决方案 您可以使用 Broad
..
我正在尝试了解 Keep 在 Akka 流中的工作方式.阅读 保持在 akka 流中意味着什么 中的答案,我明白它有助于控制我们从具体化器的左侧/右侧/两侧获得结果.但是,我仍然无法构建一个示例,因为我可以更改左/右的值并获得不同的结果. 例如 隐式val系统:ActorSystem = ActorSystem(“Playground")隐式 val 物化器:ActorMaterializ
..