为什么 Akka 流循环不在此图中结束? [英] Why Akka streams cycle doesn't end in this graph?

查看:15
本文介绍了为什么 Akka 流循环不在此图中结束?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个在下沉之前循环 n 次的图.我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么.有人能指教我吗?

I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?

谢谢.

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`

推荐答案

Stream 永远不会完成,因为合并永远不会发出完成信号.

The Stream is never completed because the merge never signals completion.

格式化图形结构后,它基本上看起来像:

After formatting your graph structure, it basically looks like:

//ignoring the preferred which is inconsequential

fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
              merge <~ toRetry    <~ broadcastArray

未完成的问题根源于您的合并步骤:

The problem of non-completion is rooted in your merge step :

// 2 inputs into merge

fileSource ~> merge 
              merge <~ toRetry

一旦 fileSource 发出了它的单个元素(即 (0, Array.empty[String])),它就会发出一个 complete 消息进行合并.

Once the fileSource has emitted its single element (namely (0, Array.empty[String])) it sends out a complete message to merge.

但是,fileSource 的完成消息在合并时被阻止.来自 文档:

However, the fileSource's completion message gets blocked at the merge. From the documentation:

akka.stream.scaladsl.MergePreferred

当所有上游完成(eagerClose=false)或一个时完成上游完成(eagerClose=true)

Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

所有的输入流完成之前,合并不会发送complete.

The merge will not send out complete until all of its input streams have completed.

// fileSource is complete ~> merge 
//                           merge <~ toRetry is still running

// complete fileSource + still running toRetry = still running merge

因此,合并将等到 toRetry 也完成.但是 toRetry 永远不会完成,因为它正在等待 merge 完成.

Therefore, merge will wait until toRetry also completes. But toRetry will never complete because it is waiting for merge to complete.

如果您希望您的特定图形在 fileSource 完成后完成,那么只需设置 eagerClose=True 这将在 fileSource 完成后导致合并完成.例如:

If you want your specific graph to complete after fileSource completes then just set eagerClose=True which will cause merge to complete once fileSource completes. E.g.:

//Add this true                                             |
//                                                          V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")

没有流循环

对于您的问题,有一个更简单的解决方案.只需使用一个使用 Flow.map 阶段="nofollow noreferrer">尾递归函数:

Without the Stream Cycle

A simpler solution exists for your problem. Just use a single Flow.map stage which utilizes a tail recursive function:

//Note: there is no use of akka in this implementation

type FileInputType = (Int, Array[String])

@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType = 
  fileInput match { 
    case (r,_) if r >= 3  => fileInput
    case (r,a)            => recursiveRetry((r+1, a))
  }    

然后您的流将减少到

//ring-fenced akka code

val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry

fileSource ~> recursiveRetryFlow ~> toSink ~> sink

结果是一个更干净的流&它避免将业务逻辑"与 akka 代码混合.这允许重试功能的单元测试完全独立于任何第三方库.您嵌入到流中的重试循环是业务逻辑".因此,无论是好是坏,混合实现都与 akka 紧密耦合.

The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.

此外,在隔离解决方案中,循环包含在尾递归函数中,即惯用的 Scala.

Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.

这篇关于为什么 Akka 流循环不在此图中结束?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆