akka-stream相关内容
Supervision.Restart 和 Supervision.Resume 之间有什么区别? 这里是情况。我有3个元素来自 Source(List(1、2、3))。在 runForeach 中,如果元素为 2 ,则会引发异常。对于 Supervision.Restart ,我希望仅处理 1 。但是奇怪的是,我看到 3 陷入沉没。为什么呢我正在使用Akka 2.4.11 impo
..
我正在通过 via 调用方法,如下所示: myRawStr(id) .take(1) .via(myMethod(“ someString”,someSource) .zip(Source.fromIterator(()=> Iterator.from(1) )) .collect { ... } myMethod 返回类型 Flow [ByteStrin
..
我正在尝试从网站,它们表示如下: 有人可以举一个使用复合流的例子吗? 什么时候应该使用它? 解决方案 Flow.fromSinkAndSource 提供了一种方便的方法来组装由以下内容组成的流 接收器作为输入, source 作为未连接的输出,可以用下图(在API链接中提供)最好地说明: + ------------------------------------ --
..
我在mongo中有一个人集合,我想以流的形式遍历集合中的每个人,并为每个人调用一个执行api调用,更改模型并插入新集合的方法 看起来像这样: def processPeople( )(隐式m:实体化器):Future [Unit] = { val people来源:Source [Person,Future [State]] = collection.find(json())。
..
我有两个案例类 case class颜色(名称:字符串,阴影:List [Shade] = List.empty ) 案例类Shade(shadeName:字符串) 我也有解析器对于这两者: object ColorParser { def apply( s:String):Either [List [SomethingElse ],Color] = {
..
如何在 FlowShape Inlet 和 Outlet 的实例c $ c>?考虑以下示例 def油门流量[T](速率:FiniteDuration)= Flow.fromGraph(GraphDSL.create(){ => 导入GraphDSL.Implicits._ val标记= Source.tick(比率,速率,单位) val zip = builder.add(
..
如果流中元素的映射/处理失败,如何重试? 我尝试在实现器中设置决策程序,但不提供重试。它只是将异常映射到监督阶段。 谢谢 解决方案 来自akka-stream 2.4。 4 ,您可以使用 recoverWithRetries (docs 这里)。 这基本上可以让您提供 重试尝试次数 根据发生的故障切换到其他来源 它既可用于 Source ,又可用于 Flow
..
我正在尝试使用Akka HTTP构建Web套接字服务。我需要处理全部到达的严格消息,以及处理在m个多帧中到达的流式消息。我使用带有handleWebSocketMessages()的路由将Web套接字的处理传递给流。我的代码看起来像这样: val route:Route = get { handleWebSocketMessages( createFlow()) } def
..
文档 MergeLatest 的a>状态: MergeLatest会为某些元素发出的每个元素发出列表输入流,但仅在每个输入流发出至少一个元素之后。 我的问题是:可以绕过它吗? 例如,我们是否可以提供一个默认值,使其在从任何输入流中接收到至少一个元素后立即开始生成列表? 以下应该是新行为: (1, 0,0) (2,0,0) (2,1,0) (2,1,1) (2,1,2)
..
我有这样的信息流: def myStream [T:AS:MAT](来源:Source [T,未使用]):Future [Seq [T]] = { return source.runWith(Sink.seq) } def myMethod(colorStream:Source [Color,NotUsed] ){ val allColors = myStream(colorS
..
这是我之前的问题:通过响应流发送大文件 我设法使用FileIO.fromPath(Paths.get(file.toURI()))通过Akka流发送文件,并且工作正常。但是,我想在发送文件之前对其进行压缩和加密。我创建了一个方法,该方法打开FileInputStream,将其通过压缩流然后通过加密流进行路由,现在我想使用Akka流将其定向到套接字中。 文件- > FileInputSt
..
学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长的变化将它们分为时间组进行处理。 示例 case class记录(时间:整数,有效载荷:字符串) 如果传入流是 Record(1,“ a”) Record(1,“ k”) Record(1,“ k”) Record(1,“ a” ) Record(
..
以下Scala代码段似乎未返回: val队列= Source.queue [Unit](10,OverflowStrategy.fail) .throttle(1,1秒,1,ThrottleMode.shaping) .to(Sink.ignore) .run () Await.result( (1至15).map(_ => queue.offer(()))。last,
..
我正在编写的部分应用程序需要从客户端到服务器任意传输大文件(对于这个问题,我假设100-200 GB)。重要的是,接收方(服务器)没有存储此文件-它只是读取/检查流并将其发送到下一个点。因为我根本不需要整个文件,但是希望同时进行多个传输,所以我想最大程度地减少RAM使用率并消除磁盘使用率。我想处理1 MB的文件。 现在,服务器使用Spring Boot和Akka。 我的第二个想法是
..
我创建了一个图,以并行化具有相同输入的两个流。流程产生Future [Option [Entity]]。如果flowA失败,我想返回Future [None],但是恢复似乎没有被调用 val图:Flow [输入,([Future [Option [Entity]],Future [Option [Entity]]]),未使用] = Flow.fromGraph(GraphDSL.creat
..
我正在使用带有队列的主机级API 。 私有val(queueSource,connectionPool)= Source.queue [(HttpRequest,Promise [HttpResponse])] [queueSize,OverflowStrategy。 backpressure).async .viaMat(poolFlow)(Keep.both) .toMat( S
..
你们能解释一下如何在akka流中使用新的 groupBy 吗? 文档似乎毫无用处。 groupBy 用于返回(T,源),但现在不再返回。这是我的示例(我模仿了一个文档): Source(List( 1->“ 1a “,1->” 1b“,1->” 1c“, 2->” 2a“,2->” 2b“, 3->” 3a“, 3->“ 3b”,3->“ 3c”, 4->“ 4a”, 5->“
..
我的akka流继续学习。我想将我的akka-streams应用程序与 akka集成在一起-cluster和DistributedPubSubMediator 。 添加对Publish的支持相当简单,但是我遇到的“订阅”部分遇到了麻烦。 供参考,类型安全的示例: 类ChatClient(name:String)扩展Actor { val中介者= DistributedPu
..
我正在与我无法控制的Java库中的数据发布者合作。发布者库使用典型的回调设置。库代码中的某个位置(库是Java,但是我会在scala中描述简洁性): type DataType = ??? 特质DataConsumer { def onData(data:DataType):Unit } 该库的用户需要编写一个实现 onData 方法的类,并将该类传递给 Dat
..
我有一个基本的scala akka http CRUD应用程序。 我只想在任何时候(例如,实体)将实体ID和一些数据(如json)写入Kafka主题。创建/更新。 我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html ,但对于scala和akka来说是新手,并且不确定如何将其集成到我的应用程序中?
..