在条件下停止 Akka Streams 的正确方法 [英] Proper way to stop Akka Streams on condition
问题描述
我已经成功使用了 FileIO 流式传输文件的内容,计算每一行的一些转换并聚合/减少结果.
I have been successfully using FileIO to stream the contents of a file, compute some transformations for each line and aggregate/reduce the results.
现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成.实现这一目标的推荐方法是什么?
Now I have a pretty specific use case, where I would like to stop the stream when a condition is reached, so that it is not necessary to read the whole file but the process finishes as soon as possible. What is the recommended way to achieve this?
推荐答案
如果停止条件是在流的外面"
有一个名为 KillSwitch
的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html 流将被关闭一次通知终止开关.
If the stop condition is "on the outside of the stream"
There is a advanced building-block called KillSwitch
that you could use to do this: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html The stream would get shut down once the kill switch is notified.
它有像 abort(reason)
/shutdown
等方法,请看这里的 API:http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
It has methods like abort(reason)
/ shutdown
etc, see here for it's API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
参考文档在这里:http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala
示例用法是:
val countingSrc = Source(Stream.from(1)).delay(1.second,
DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
<小时>
如果停止条件在流内
您可以使用 takeWhile
来真正表达任何条件,尽管有时 take
或 limit
也可能足以占用 10 lnes".
If the stop condition is inside the stream
You can use takeWhile
to express any condition really, though sometimes take
or limit
may be also enough "take 10 lnes".
如果您的逻辑非常先进,您可以使用 statefulMapConcat
构建一个处理该特殊逻辑的特殊阶段,该阶段允许表达任何内容 - 这样您就可以在任何时候从里面".
If your logic is very advanced, you could build a special stage that handles that special logic using statefulMapConcat
that allows to express literally anything - so you could complete the stream whenever you want to "from the inside".
这篇关于在条件下停止 Akka Streams 的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!