从GraphStage内部关闭Akka流(Akka 2.4.2) [英] Closing an Akka stream from inside a GraphStage (Akka 2.4.2)

查看:147
本文介绍了从GraphStage内部关闭Akka流(Akka 2.4.2)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Akka Stream 2.4.2中,已弃用PushStage。对于Streams 2.0.3,我正在使用以下答案中的解决方案:

In Akka Stream 2.4.2, PushStage has been deprecated. For Streams 2.0.3 I was using the solution from this answer:

如何关闭Akka流?

原为:

import akka.stream.stage._

    val closeStage = new PushStage[Tpe, Tpe] {
      override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          ctx.finish()
        case elem ⇒
          ctx.push(elem)
      }
    }

如何在GraphStage / onPush()内部立即关闭2.4.2中的流?

How would I close a stream in 2.4.2 immediately, from inside a GraphStage / onPush() ?

推荐答案

像这样:

val closeStage = new GraphStage[FlowShape[Tpe, Tpe]] {
  val in = Inlet[Tpe]("closeStage.in")
  val out = Outlet[Tpe]("closeStage.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          completeStage()
        case msg ⇒
          push(out, msg)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

它比较冗长,但一方可以以可重用的方式定义此逻辑,另一方面,不必再担心流元素之间的差异,因为 GraphStage 可以以与流相同的方式处理将被处理:

It is more verbose but one the one side one can define this logic in a reusable way and on the other side one no longer has to worry about differences between the stream elements because the GraphStage can be handled in the same way as a flow would be handled:

val flow: Flow[Tpe] = ???
val newFlow = flow.via(closeStage)

这篇关于从GraphStage内部关闭Akka流(Akka 2.4.2)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆