为什么 Akka 流循环不在此图中结束? [英] Why Akka streams cycle doesn't end in this graph?
问题描述
我想创建一个在下沉之前循环 n 次的图.我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么.有人能指教我吗?
I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?
谢谢.
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("retry " + (r < 3) + " " + r)
r < 3
}
}.map {
case (r, s) => (r + 1, s)
}
val toSink = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("sink " + (r >= 3) + " " + r)
r >= 3
}
}
merge.preferred <~ toRetry <~ broadcastArray
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
}
}
implicit val system = ActorSystem()
implicit val _ = ActorMaterializer()
val run: Future[Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println("finished")
system.shutdown()
}
}
}
}`
推荐答案
Stream 永远不会完成,因为合并永远不会发出完成信号.
The Stream is never completed because the merge never signals completion.
格式化图形结构后,它基本上看起来像:
After formatting your graph structure, it basically looks like:
//ignoring the preferred which is inconsequential
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
未完成的问题根源于您的合并步骤:
The problem of non-completion is rooted in your merge step :
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
一旦 fileSource 发出了它的单个元素(即 (0, Array.empty[String])
),它就会发出一个 complete
消息进行合并.
Once the fileSource has emitted its single element (namely (0, Array.empty[String])
) it sends out a complete
message to merge.
但是,fileSource 的完成消息在合并时被阻止.来自 文档:
However, the fileSource's completion message gets blocked at the merge. From the documentation:
akka.stream.scaladsl.MergePreferred
当所有上游完成(eagerClose=false)或一个时完成上游完成(eagerClose=true)
Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
在所有的输入流完成之前,合并不会发送complete
.
The merge will not send out complete
until all of its input streams have completed.
// fileSource is complete ~> merge
// merge <~ toRetry is still running
// complete fileSource + still running toRetry = still running merge
因此,合并将等到 toRetry
也完成.但是 toRetry
永远不会完成,因为它正在等待 merge
完成.
Therefore, merge will wait until toRetry
also completes. But toRetry
will never complete because it is waiting for merge
to complete.
如果您希望您的特定图形在 fileSource 完成后完成,那么只需设置 eagerClose=True
这将在 fileSource 完成后导致合并完成.例如:
If you want your specific graph to complete after fileSource completes then just set eagerClose=True
which will cause merge to complete once fileSource completes. E.g.:
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
没有流循环
对于您的问题,有一个更简单的解决方案.只需使用一个使用 Flow.map 阶段="nofollow noreferrer">尾递归函数:
Without the Stream Cycle
A simpler solution exists for your problem. Just use a single Flow.map
stage which utilizes a tail recursive function:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
然后您的流将减少到
//ring-fenced akka code
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
结果是一个更干净的流&它避免将业务逻辑"与 akka 代码混合.这允许重试功能的单元测试完全独立于任何第三方库.您嵌入到流中的重试循环是业务逻辑".因此,无论是好是坏,混合实现都与 akka 紧密耦合.
The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.
此外,在隔离解决方案中,循环包含在尾递归函数中,即惯用的 Scala.
Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.
这篇关于为什么 Akka 流循环不在此图中结束?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!