为什么阿卡流循环不会在此图中结束? [英] Why Akka streams cycle doesn't end in this graph?

查看:167
本文介绍了为什么阿卡流循环不会在此图中结束?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个图形,在下沉之前循环n次。我刚刚创建了满足我的要求的样本,但在下沉后不会结束,我真的不明白为什么。有人能给我启发吗?



谢谢。

  import akka.actor .ActorSystem 
导入akka.stream.scaladsl._
导入akka.stream。{ActorMaterializer,UniformFanOutShape}

导入scala.concurrent.Future

对象测试{
def main(args:Array [String]){
val忽略:Sink [Any,Future [Unit]] = Sink.ignore
val closed:RunnableGraph [Future [Unit ]] = FlowGraph.closed(ignore){implicit b =>
sink => {
import FlowGraph.Implicits._

val fileSource = Source.single((0,Array [String]()))
val merge = b.add(MergePreferred [ (Int,Array [String])](1).named(merge))
val afterMerge = Flow [(Int,Array [String])]。map {
e =>
println(合并后)
e
}
val broadcastArray:UniformFanOutShape [(Int,Array [String]),(Int,Array [String])] = b。 add(Broadcast [(Int,Array [String])](2).named(broadcastArray))
val toRetry = Flow [(Int,Array [String])]。filter {
case (r,s)=> {
println(retry+(r <3)++ r)
r < 3
}
} .map {
case(r,s)=> (r,s)=>(b,b,b),其中(r + 1,s)
}
val toSink = Flow [(Int,Array [String])] {
println(sink+(r> = 3)++ r)
r> = 3
}
}
merge.preferred< ;〜toRetry<〜broadcastArray
fileSource〜>合并〜> afterMerge〜> broadcastArray〜> toSink〜> ()


隐式val系统= ActorSystem()
隐式val _ = ActorMaterializer()
val run:Future [Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println(finished)
system.shutdown()
}
}
}
}`
解决方案

流从来没有完成,因为合并永远不会完成。

格式化图形结构后,它基本上看起来像:

  // //忽略首选是无关紧要的

fileSource〜>合并〜> afterMerge〜> broadcastArray〜> toSink〜>汇
合并<〜toRetry<〜broadcastArray

未完成的问题是根源于您的合并步骤:

  // 2输入合并

fileSource〜>合并
merge<〜toRetry

一旦fileSource发出了它的单个元素(即(0,Array.empty [String]))它发出一个完整的消息来合并。



但是,fileSource的完成消息在合并时被阻塞。从文档

lockquote
akka.stream.scaladsl.Merge优先



完成所有上游完成(eagerClose = false)或一个
上游完成(eagerClose = true)

合并将不会发送完成,直到其输入流的全部完成。

  // fileSource完成〜>合并
//合并<〜toRetry仍在运行

//完成fileSource +仍在运行toRetry =仍在运行合并

因此,merge会等到 toRetry 完成。但 toRetry 永远不会完成,因为它正在等待 merge 完成。



如果您希望在fileSource完成后完成特定的图形,那么只需设置 eagerClose = True ,一旦fileSource完成,将导致合并完成。例如:

  // //添加这个true | 
// V
val merge = b.add(MergePreferred [(Int,Array [String])](1,true).named(merge)



没有流循环



一个简单的解决方案存在于你的问题中。单 Flow.map 使用 tail递归函数:

  //注意:没有使用akka在这个实现中
$ b $ FileInputType =(Int,Array [String])

@ scala.annotation.tailrec
def recursiveRetry(fileInput:FileInputType):FileInputType =
fileInput match {
case(r,_)if r> = 3 => fileInput
case(r,a)=> recursiveRetry((r + 1,a))
}

您的信息流将被缩减为

  // ring-fenced akka code 

val re cursiveRetryFlow = Flow [FileInputType] map recursiveRetry

fileSource〜> recursiveRetryFlow〜> toSink〜>水槽

结果是一个更简洁的流 + 它避免了混合商业逻辑与阿卡片代码。这允许单元测试的重试功能完全独立于任何第三方库。您在流中嵌入的重试循环是业务逻辑。因此,混合实现与akka紧密耦合,无论好坏如何。

另外,在分离解决方案中,循环包含在尾递归函数中,即惯用的Scala


I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?

Thanks.

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`

解决方案

The Stream is never completed because the merge never signals completion.

After formatting your graph structure, it basically looks like:

//ignoring the preferred which is inconsequential

fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
              merge <~ toRetry    <~ broadcastArray

The problem of non-completion is rooted in your merge step :

// 2 inputs into merge

fileSource ~> merge 
              merge <~ toRetry

Once the fileSource has emitted its single element (namely (0, Array.empty[String])) it sends out a complete message to merge.

However, the fileSource's completion message gets blocked at the merge. From the documentation:

akka.stream.scaladsl.MergePreferred

Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

The merge will not send out complete until all of its input streams have completed.

// fileSource is complete ~> merge 
//                           merge <~ toRetry is still running

// complete fileSource + still running toRetry = still running merge

Therefore, merge will wait until toRetry also completes. But toRetry will never complete because it is waiting for merge to complete.

If you want your specific graph to complete after fileSource completes then just set eagerClose=True which will cause merge to complete once fileSource completes. E.g.:

//Add this true                                             |
//                                                          V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")

Without the Stream Cycle

A simpler solution exists for your problem. Just use a single Flow.map stage which utilizes a tail recursive function:

//Note: there is no use of akka in this implementation

type FileInputType = (Int, Array[String])

@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType = 
  fileInput match { 
    case (r,_) if r >= 3  => fileInput
    case (r,a)            => recursiveRetry((r+1, a))
  }    

Your stream would then be reduced to

//ring-fenced akka code

val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry

fileSource ~> recursiveRetryFlow ~> toSink ~> sink

The result is a cleaner stream + it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.

Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.

这篇关于为什么阿卡流循环不会在此图中结束?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆