为什么阿卡流循环不会在此图中结束? [英] Why Akka streams cycle doesn't end in this graph?
问题描述
我想创建一个图形,在下沉之前循环n次。我刚刚创建了满足我的要求的样本,但在下沉后不会结束,我真的不明白为什么。有人能给我启发吗?
谢谢。
import akka.actor .ActorSystem
导入akka.stream.scaladsl._
导入akka.stream。{ActorMaterializer,UniformFanOutShape}
导入scala.concurrent.Future
对象测试{
def main(args:Array [String]){
val忽略:Sink [Any,Future [Unit]] = Sink.ignore
val closed:RunnableGraph [Future [Unit ]] = FlowGraph.closed(ignore){implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0,Array [String]()))
val merge = b.add(MergePreferred [ (Int,Array [String])](1).named(merge))
val afterMerge = Flow [(Int,Array [String])]。map {
e =>
println(合并后)
e
}
val broadcastArray:UniformFanOutShape [(Int,Array [String]),(Int,Array [String])] = b。 add(Broadcast [(Int,Array [String])](2).named(broadcastArray))
val toRetry = Flow [(Int,Array [String])]。filter {
case (r,s)=> {
println(retry+(r <3)++ r)
r < 3
}
} .map {
case(r,s)=> (r,s)=>(b,b,b),其中(r + 1,s)
}
val toSink = Flow [(Int,Array [String])] {
println(sink+(r> = 3)++ r)
r> = 3
}
}
merge.preferred< ;〜toRetry<〜broadcastArray
fileSource〜>合并〜> afterMerge〜> broadcastArray〜> toSink〜> ()
隐式val系统= ActorSystem()
隐式val _ = ActorMaterializer()
val run:Future [Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println(finished)
system.shutdown()
}
}
}
}`
$ c $ 解决方案
流从来没有完成,因为合并永远不会完成。
格式化图形结构后,它基本上看起来像:
// //忽略首选是无关紧要的
fileSource〜>合并〜> afterMerge〜> broadcastArray〜> toSink〜>汇
合并<〜toRetry<〜broadcastArray
未完成的问题是根源于您的合并步骤:
// 2输入合并
fileSource〜>合并
merge<〜toRetry
一旦fileSource发出了它的单个元素(即(0,Array.empty [String])
)它发出一个完整的
消息来合并。
但是,fileSource的完成消息在合并时被阻塞。从文档:
lockquote
akka.stream.scaladsl.Merge优先
完成所有上游完成(eagerClose = false)或一个
上游完成(eagerClose = true)
合并将不会发送完成
,直到其输入流的全部完成。
// fileSource完成〜>合并
//合并<〜toRetry仍在运行
//完成fileSource +仍在运行toRetry =仍在运行合并
因此,merge会等到 toRetry
完成。但 toRetry
永远不会完成,因为它正在等待 merge
完成。
如果您希望在fileSource完成后完成特定的图形,那么只需设置 eagerClose = True
,一旦fileSource完成,将导致合并完成。例如:
// //添加这个true |
// V
val merge = b.add(MergePreferred [(Int,Array [String])](1,true).named(merge)
没有流循环
一个简单的解决方案存在于你的问题中。单 Flow.map
使用 tail递归函数:
//注意:没有使用akka在这个实现中
$ b $ FileInputType =(Int,Array [String])
@ scala.annotation.tailrec
def recursiveRetry(fileInput:FileInputType):FileInputType =
fileInput match {
case(r,_)if r> = 3 => fileInput
case(r,a)=> recursiveRetry((r + 1,a))
}
您的信息流将被缩减为
// ring-fenced akka code
val re cursiveRetryFlow = Flow [FileInputType] map recursiveRetry
fileSource〜> recursiveRetryFlow〜> toSink〜>水槽
结果是一个更简洁的流 +
它避免了混合商业逻辑与阿卡片代码。这允许单元测试的重试功能完全独立于任何第三方库。您在流中嵌入的重试循环是业务逻辑。因此,混合实现与akka紧密耦合,无论好坏如何。
另外,在分离解决方案中,循环包含在尾递归函数中,即惯用的Scala 。
I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?
Thanks.
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("retry " + (r < 3) + " " + r)
r < 3
}
}.map {
case (r, s) => (r + 1, s)
}
val toSink = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("sink " + (r >= 3) + " " + r)
r >= 3
}
}
merge.preferred <~ toRetry <~ broadcastArray
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
}
}
implicit val system = ActorSystem()
implicit val _ = ActorMaterializer()
val run: Future[Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println("finished")
system.shutdown()
}
}
}
}`
解决方案 The Stream is never completed because the merge never signals completion.
After formatting your graph structure, it basically looks like:
//ignoring the preferred which is inconsequential
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
The problem of non-completion is rooted in your merge step :
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
Once the fileSource has emitted its single element (namely (0, Array.empty[String])
) it sends out a complete
message to merge.
However, the fileSource's completion message gets blocked at the merge. From the documentation:
akka.stream.scaladsl.MergePreferred
Completes when all upstreams complete (eagerClose=false) or one
upstream completes (eagerClose=true)
The merge will not send out complete
until all of its input streams have completed.
// fileSource is complete ~> merge
// merge <~ toRetry is still running
// complete fileSource + still running toRetry = still running merge
Therefore, merge will wait until toRetry
also completes. But toRetry
will never complete because it is waiting for merge
to complete.
If you want your specific graph to complete after fileSource completes then just set eagerClose=True
which will cause merge to complete once fileSource completes. E.g.:
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
Without the Stream Cycle
A simpler solution exists for your problem. Just use a single Flow.map
stage which utilizes a tail recursive function:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
Your stream would then be reduced to
//ring-fenced akka code
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
The result is a cleaner stream +
it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.
Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.
这篇关于为什么阿卡流循环不会在此图中结束?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!