akka-stream相关内容

修改通过以接受返回Future的方法

我正在通过 via 调用方法,如下所示: myRawStr(id) .take(1) .via(myMethod(“ someString”,someSource) .zip(Source.fromIterator(()=> Iterator.from(1) )) .collect { ... } myMethod 返回类型 Flow [ByteStrin ..
发布时间:2020-06-03 18:49:47 其他开发

合成流的目的是什么(来自Sink和Source)?

我正在尝试从网站,它们表示如下: 有人可以举一个使用复合流的例子吗? 什么时候应该使用它? 解决方案 Flow.fromSinkAndSource 提供了一种方便的方法来组装由以下内容组成的流 接收器作为输入, source 作为未连接的输出,可以用下图(在API链接中提供)最好地说明: + ------------------------------------ -- ..
发布时间:2020-06-03 18:49:43 其他开发

使用Akka流查看mongo集合

我在mongo中有一个人集合,我想以流的形式遍历集合中的每个人,并为每个人调用一个执行api调用,更改模型并插入新集合的方法 看起来像这样: def processPeople( )(隐式m:实体化器):Future [Unit] = { val people来源:Source [Person,Future [State]] = collection.find(json())。 ..
发布时间:2020-06-03 18:49:34 其他开发

如何将一个源映射到另一个?

我有两个案例类 case class颜色(名称:字符串,阴影:List [Shade] = List.empty ) 案例类Shade(shadeName:字符串) 我也有解析器对于这两者: object ColorParser { def apply( s:String):Either [List [SomethingElse ],Color] = { ..
发布时间:2020-06-03 18:49:30 其他开发

重试Akka流图

如果流中元素的映射/处理失败,如何重试? 我尝试在实现器中设置决策程序,但不提供重试。它只是将异常映射到监督阶段。 谢谢 解决方案 来自akka-stream 2.4。 4 ,您可以使用 recoverWithRetries (docs 这里)。 这基本上可以让您提供 重试尝试次数 根据发生的故障切换到其他来源 它既可用于 Source ,又可用于 Flow ..
发布时间:2020-06-03 18:48:51 其他开发

在Akka中同时使用严格和流式WebSocket消息

我正在尝试使用Akka HTTP构建Web套接字服务。我需要处理全部到达的严格消息,以及处理在m个多帧中到达的流式消息。我使用带有handleWebSocketMessages()的路由将Web套接字的处理传递给流。我的代码看起来像这样: val route:Route = get { handleWebSocketMessages( createFlow()) } def ..
发布时间:2020-06-03 18:48:47 其他开发

MergeLatest的默认值

文档 MergeLatest 的a>状态: MergeLatest会为某些元素发出的每个元素发出列表输入流,但仅在每个输入流发出至少一个元素之后。 我的问题是:可以绕过它吗? 例如,我们是否可以提供一个默认值,使其在从任何输入流中接收到至少一个元素后立即开始生成列表? 以下应该是新行为: (1, 0,0) (2,0,0) (2,1,0) (2,1,1) (2,1,2) ..
发布时间:2020-06-03 18:48:08 其他开发

如何将流的内容放入val?

我有这样的信息流: def myStream [T:AS:MAT](来源:Source [T,未使用]):Future [Seq [T]] = { return source.runWith(Sink.seq) } def myMethod(colorStream:Source [Color,NotUsed] ){ val allColors = myStream(colorS ..
发布时间:2020-06-03 18:48:02 其他开发

通过Akka / Spring流从InputStream发送数据

这是我之前的问题:通过响应流发送大文件 我设法使用FileIO.fromPath(Paths.get(file.toURI()))通过Akka流发送文件,并且工作正常。但是,我想在发送文件之前对其进行压缩和加密。我创建了一个方法,该方法打开FileInputStream,将其通过压缩流然后通过加密流进行路由,现在我想使用Akka流将其定向到套接字中。 文件- > FileInputSt ..

Akka流分批

学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长的变化将它们分为时间组进行处理。 示例 case class记录(时间:整数,有效载荷:字符串) 如果传入流是 Record(1,“ a”) Record(1,“ k”) Record(1,“ k”) Record(1,“ a” ) Record( ..
发布时间:2020-06-03 18:47:52 其他开发

通过响应流发送大文件

我正在编写的部分应用程序需要从客户端到服务器任意传输大文件(对于这个问题,我假设100-200 GB)。重要的是,接收方(服务器)没有存储此文件-它只是读取/检查流并将其发送到下一个点。因为我根本不需要整个文件,但是希望同时进行多个传输,所以我想最大程度地减少RAM使用率并消除磁盘​​使用率。我想处理1 MB的文件。 现在,服务器使用Spring Boot和Akka。 我的第二个想法是 ..
发布时间:2020-06-03 18:46:24 其他开发

Akka流图恢复问题

我创建了一个图,以并行化具有相同输入的两个流。流程产生Future [Option [Entity]]。如果flowA失败,我想返回Future [None],但是恢复似乎没有被调用 val图:Flow [输入,([Future [Option [Entity]],Future [Option [Entity]]]),未使用] = Flow.fromGraph(GraphDSL.creat ..
发布时间:2020-06-03 18:46:14 其他开发

如何启用Source.Queue背压

我正在使用带有队列的主机级API 。 私有val(queueSource,connectionPool)= Source.queue [(HttpRequest,Promise [HttpResponse])] [queueSize,OverflowStrategy。 backpressure).async .viaMat(poolFlow)(Keep.both) .toMat( S ..
发布时间:2020-06-03 18:45:45 其他开发

如何使用SubFlows对已排序流的项目进行分组?

你们能解释一下如何在akka流中使用新的 groupBy 吗? 文档似乎毫无用处。 groupBy 用于返回(T,源),但现在不再返回。这是我的示例(我模仿了一个文档): Source(List( 1->“ 1a “,1->” 1b“,1->” 1c“, 2->” 2a“,2->” 2b“, 3->” 3a“, 3->“ 3b”,3->“ 3c”, 4->“ 4a”, 5->“ ..
发布时间:2020-06-03 18:45:28 其他开发

带有akka集群的akka​​流

我的akka​​流继续学习。我想将我的akka​​-streams应用程序与 akka集成在一起-cluster和DistributedPubSubMediator 。 添加对Publish的支持相当简单,但是我遇到的“订阅”部分遇到了麻烦。 供参考,类型安全的示例: 类ChatClient(name:String)扩展Actor { val中介者= DistributedPu ..
发布时间:2020-06-03 18:44:50 其他开发

将回调方法实现转换为akka流Source

我正在与我无法控制的Java库中的数据发布者合作。发布者库使用典型的回调设置。库代码中的某个位置(库是Java,但是我会在scala中描述简洁性): type DataType = ??? 特质DataConsumer { def onData(data:DataType):Unit } 该库的用户需要编写一个实现 onData 方法的类,并将该类传递给 Dat ..
发布时间:2020-06-03 18:44:28 其他开发

如何将Akka Streams Kafka(reactive-kafka)集成到Akka HTTP应用程序中?

我有一个基本的scala akka http CRUD应用程序。 我只想在任何时候(例如,实体)将​​实体ID和一些数据(如json)写入Kafka主题。创建/更新。 我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html ,但对于scala和akka来说是新手,并且不确定如何将其集成到我的应用程序中? ..
发布时间:2020-06-03 18:43:47 其他开发