如何清理连续Akka流中的子流 [英] How to clean up substreams in continuous Akka streams

查看:72
本文介绍了如何清理连续Akka流中的子流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

鉴于我有很长的事件流正在流过某些东西,如下所示。经过很长时间后,将创建许多不再需要的子流。

Given I have a very long running stream of events flowing through something as show below. When a long time has passed there will be lots of sub streams created that is no longer needed.


是否有一种方法可以在给定的时间清理特定的子流,对于
来说,由id 3创建的子流应该会被清除并且扫描方法中的状态
在13Pm(Wid的过期属性)丢失了?

Is there a way to clean up a specific substream at a given time, for example the substream created by id 3 should be cleaned and the state in the scan method lost at 13Pm (expires property of Wid)?



case class Wid(id: Int, v: String, expires: LocalDateTime)
test("Substream with scan") {
  val (pub, sub) = TestSource.probe[Wid]
    .groupBy(Int.MaxValue, _.id)
    .scan("")((a: String, b: Wid) => a + b.v)
    .mergeSubstreams
    .toMat(TestSink.probe[String])(Keep.both)
    .run()
}


推荐答案

TL; DR 您可以在一段时间后关闭子流。但是,使用输入来动态设置内置阶段的时间是另一回事。

TL;DR You can close a substream after some time. However, using input to dynamically set the time with built-in stages is another matter.

关闭子流

要关闭流,通常(从上游)完成它,但也可以(从下游)取消它。例如, take(n:Int)流将在 n 个元素通过后取消。

To close a flow, you usually complete it (from upstream), but you can also cancel it (from downstream). For instance, the take(n: Int) flow will cancel once n elements have gone through.

现在,在 groupBy 的情况下,您无法完成子流,因为上游已被所有子流共享,但是您可以取消它。

Now, in the groupBy case, you cannot complete a substream, since upstream is shared by all substreams, but you can cancel it. How depends on what condition you want to put on its end.

但是,请注意 groupBy 会删除输入已关闭的子流:如果一个ID为 3 的新元素从上游流向 groupBy c $ c> 3 -substream已关闭,它将被忽略,而下一个元素将被拉入。其原因可能是在关闭和关闭之间的过程中某些元素可能会丢失重新打开子流。另外,如果您的流应该运行很长时间,这会影响性能,因为在转发到相关(实时)子流之前,将根据已关闭子流的列表检查每个元素。如果您对此功能的性能不满意,则可能需要实现自己的状态过滤器(例如,使用Bloom过滤器)。

However, be aware that groupBy removes inputs for subflows that have already been closed: If a new element with id 3 comes from upstream to the groupBy after the 3-substream has been closed, it will simply be ignored and the next element will be pulled in. The reason for this is probably that some elements might be lost in the process between closing and re-opening of the substream. Also, if your stream is supposed to run for a very long time, this will affect performances because each element will be checked against the list of closed substreams before being forwarded to the relevant (live) substream. You might want to implement your own stateful filter (say, with a bloom filter) if you're not satisfied with the performances of this.

要关闭子流,我通常使用 take (如果您只需要给定数量的元素,但是在无限流上可能不是这种情况),或者使用某种超时方法: completionTimeout 如果要从实现到关闭的固定时间,或者 idleTimeout 如果要在没有任何元素通过某些时间时关闭时间。请注意,这些流不会取消流而是使流失败,因此您必须使用 recover recoverWith 将失败更改为取消的阶段( recoverWith 允许您通过发送 Source.empty )。

To close a substream, I usually use either take (if you want only a given number of elements, but that's probably not the case on an infinite stream), or some kind of timeout: either completionTimeout if you want a fixed time from materialization to closure or idleTimeout if you want to close when no element have been coming through for some time. Note that these flows do not cancel the stream but fail it, so you have to catch the exception with a recover or recoverWith stage to change the failure into a cancel (recoverWith allows you to cancel without sending any last element, by recovering with Source.empty).

动态设置超时时间

现在您想要什么将根据第一个通过元素动态设置关闭时间。这更加复杂,因为流的物化与通过流的元素无关。确实,在通常的情况下(没有 groupBy ),流在任何元素通过之前就已实现,因此使用元素来实现它们是没有意义的。

Now what you want is to set dynamically the closing time according to the first passing element. This is more complicated because materialization of streams is independant of the elements that pass through them. Indeed, in the usual (without groupBy) case, streams are materialized before any element go through them, so it makes no sense to use elements to materialize them.

我在该问题,最后使用带有签名

I had similar issues in that question, and ended up using a modified version of groupBy with signature

paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])

允许使用定义子流的键来定义每个子流。可以将其修改为具有第一个元素(而不是键)作为参数。

that allows to define every substream using the key that defined it. This can be modified to have the first element (instead of the key), as parameter.

另一种(可能更简单,在您的情况下)编写自己的方法完全满足您的需求的阶段:从第一个元素获取结束时间并在那时取消流。这是一个示例实现(我使用调度程序而不是设置状态):

Another (probably simpler, in your case) way would be to write your own stage that does exactly what you want: get end-time from first element and cancel the stream at that time. Here is an example implementation for this (I used a scheduler instead of setting a state):

object CancelAfterTimer

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("CancelAfter.in")
  val out = Outlet[T]("CancelAfter.in")
  override val shape: FlowShape[T, T] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler  {
    override def onPush(): Unit = {
      val elem = grab(in)
      if (!isTimerActive(CancelAfterTimer))
        scheduleOnce(CancelAfterTimer, getTimeout(elem))
      push(out, elem)
    }

    override def onTimer(timerKey: Any): Unit = 
      completeStage() //this will cancel the upstream and close the downstrean

    override def onPull(): Unit = pull(in)

    setHandlers(in, out, this)
  }
}

这篇关于如何清理连续Akka流中的子流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆