在条件下停止 Akka Streams 的正确方法 [英] Proper way to stop Akka Streams on condition

查看:25
本文介绍了在条件下停止 Akka Streams 的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功使用了 FileIO 流式传输文件的内容,计算每一行的一些转换并聚合/减少结果.

I have been successfully using FileIO to stream the contents of a file, compute some transformations for each line and aggregate/reduce the results.

现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成.实现这一目标的推荐方法是什么?

Now I have a pretty specific use case, where I would like to stop the stream when a condition is reached, so that it is not necessary to read the whole file but the process finishes as soon as possible. What is the recommended way to achieve this?

推荐答案

如果停止条件是在流的外面"

有一个名为 KillSwitch 的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html 流将被关闭一次通知终止开关.

If the stop condition is "on the outside of the stream"

There is a advanced building-block called KillSwitch that you could use to do this: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html The stream would get shut down once the kill switch is notified.

它有像 abort(reason)/shutdown 等方法,请看这里的 API:http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

It has methods like abort(reason) / shutdown etc, see here for it's API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

参考文档在这里:http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

示例用法是:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

<小时>

如果停止条件在流内

您可以使用 takeWhile 来真正表达任何条件,尽管有时 takelimit 也可能足以占用 10 lnes".


If the stop condition is inside the stream

You can use takeWhile to express any condition really, though sometimes take or limit may be also enough "take 10 lnes".

如果您的逻辑非常先进,您可以使用 statefulMapConcat 构建一个处理该特殊逻辑的特殊阶段,该阶段允许表达任何内容 - 这样您就可以在任何时候从里面".

If your logic is very advanced, you could build a special stage that handles that special logic using statefulMapConcat that allows to express literally anything - so you could complete the stream whenever you want to "from the inside".

这篇关于在条件下停止 Akka Streams 的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆