正确停止Akka Streams的正确方法 [英] Proper way to stop Akka Streams on condition

查看:147
本文介绍了正确停止Akka Streams的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功使用 FileIO 以流式传输文件的内容,为每一行计算一些转换并汇总/减少结果。

I have been successfully using FileIO to stream the contents of a file, compute some transformations for each line and aggregate/reduce the results.

现在,我有一个非常具体的用例,在这种情况下,我想在达到条件时停止流,因此不必读取整个文件,而是该过程尽快完成。推荐的方法是什么?

Now I have a pretty specific use case, where I would like to stop the stream when a condition is reached, so that it is not necessary to read the whole file but the process finishes as soon as possible. What is the recommended way to achieve this?

推荐答案

如果停止条件是在流的外部



有一个名为 KillSwitch 的高级构建基块,您可以用来执行以下操作: http://doc.akka.io/japi/akka/2.4.7/akka/stream /KillSwitches.html 通知终止开关后,流将被关闭。

If the stop condition is "on the outside of the stream"

There is a advanced building-block called KillSwitch that you could use to do this: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html The stream would get shut down once the kill switch is notified.

它具有 abort(reason)之类的方法 / 关闭等,有关API的信息,请参见此处: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

It has methods like abort(reason) / shutdown etc, see here for it's API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

参考文档在此处: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic .html#kill-switch-scala

示例用法为:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2






如果停止条件在流内



您可以使用 takeWhile 表示任何条件,尽管有时 take limit 可能也足够服用10升。


If the stop condition is inside the stream

You can use takeWhile to express any condition really, though sometimes take or limit may be also enough "take 10 lnes".

如果您的逻辑非常先进,则可以使用 statefulMapConcat 建立一个特殊的阶段来处理该特殊逻辑可以表达任何内容-因此您可以随时从内部完成流。

If your logic is very advanced, you could build a special stage that handles that special logic using statefulMapConcat that allows to express literally anything - so you could complete the stream whenever you want to "from the inside".

这篇关于正确停止Akka Streams的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆