在Akka Streams中从演员创建流 [英] Creating a flow from actor in Akka Streams

查看:71
本文介绍了在Akka Streams中从演员创建流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

可以使用 Source.actorPublisher() Sink.actorSubscriber()从参与者创建源和接收器方法分别。但是有可能从演员创建 Flow 吗?

It's possible to create sources and sinks from actors using Source.actorPublisher() and Sink.actorSubscriber() methods respectively. But is it possible to create a Flow from actor?

从概念上讲,似乎没有充分的理由不会,因为它同时实现了 ActorPublisher ActorSubscriber 特征,但不幸的是, Flow 对象没有任何执行此操作的方法。在这篇优秀博客文章中,它是在较早版本的Akka Streams中完成的,因此问题是最新版本(2.4.9)是否也可能。

Conceptually there doesn't seem to be a good reason not to, given that it implements both ActorPublisher and ActorSubscriber traits, but unfortunately, the Flow object doesn't have any method for doing this. In this excellent blog post it's done in an earlier version of Akka Streams, so the question is if it's possible also in the latest (2.4.9) version.

推荐答案

我是Akka团队的一员,想要使用这个问题来阐明有关原始Reactive Streams接口的一些信息。我希望您会发现这很有用。

I'm part of the Akka team and would like to use this question to clarify a few things about the raw Reactive Streams interfaces. I hope you'll find this useful.

最值得注意的是,我们很快就会在Akka团队博客上发布有关构建自定义阶段(包括Flows)的多个帖子,因此

Most notably, we'll be posting multiple posts on the Akka team blog about building custom stages, including Flows, soon, so keep an eye on it.

不要使用ActorPublisher / ActorSubscriber

请不要使用 ActorPublisher ActorSubscriber 。它们的级别太低,您可能最终以违反反应流规范的方式实施它们。它们是过去的遗物,即使在那时也只是仅限高级用户模式。如今,确实没有理由使用这些类。我们从未提供过构建流程的方法,因为如果将其公开为原始 Actor API,供您实现并获取

Please don't use ActorPublisher and ActorSubscriber. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification. They're a relict of the past and even then were only "power-user mode only". There really is no reason to use those classes nowadays. We never provided a way to build a flow because the complexity is simply explosive if it was exposed as "raw" Actor API for you to implement and get all the rules implemented correctly.

如果您真的想实现原始ReactiveStreams接口,请使用规范的TCK 来验证您的实现是否正确。 Flow (或者在RS术语中, Processor )可能会使您措手不及

If you really really want to implement raw ReactiveStreams interfaces, then please do use the Specification's TCK to verify your implementation is correct. You will likely be caught off guard by some of the more complex corner cases a Flow (or in RS terminology a Processor has to handle).

大多数操作都可以在不进行底层操作的情况下进行。

您应该能够通过<< c $ c> Flow [T] 构建许多流,并在其上添加所需的操作,例如:

Many flows you should be able to simply build by building from a Flow[T] and adding the needed operations onto it, just as an example:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

这是流的可重用描述。

在询问高级用户模式时,这是DSL本身上功能最强大的运营商: statefulFlatMapConcat 。可以使用它来表达对普通流元素进行的绝大多数操作: Flow.statefulMapConcat [T](f:()⇒(Out)⇒Iterable [T]):Repr [T]

Since you're asking about power user mode, this is the most powerful operator on the DSL itself: statefulFlatMapConcat. The vast majority of operations operating on plain stream elements is expressable using it: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T].

如果需要计时器,则可以使用来源 zip 。计时器等。

If you need timers you could zip with a Source.timer etc.

GraphStage是最简单且最安全的API构建自定义阶段

GraphStage is the simplest and safest API to build custom stages

相反,构建Sources / Flows / Sinks具有自己强大的和安全 API: GraphStage 。请阅读有关构建自定义GraphStages的文档(可以是接收器/源/流,甚至可以是任何任意形状)。它为您处理所有复杂的Reactive Streams规则,同时在实现阶段(可能是Flow)的同时为您提供充分的自由和类型安全性。

Instead, building Sources/Flows/Sinks has its own powerful and safe API: the GraphStage. Please read the documentation about building custom GraphStages (they can be a Sink/Source/Flow or even any arbitrary shape). It handles all of the complex Reactive Streams rules for you, while giving you full freedom and type-safety while implementing your stages (which could be a Flow).

,取自文档,是 filter(T =>布尔值)运算符的GraphStage实现:

For example, taken from the docs, is an GraphStage implementation of the filter(T => Boolean) operator:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

除了文档,这些博客文章还详细解释了为什么此API是构建自定义阶段的圣杯。任何形状:

In addition to the docs, these blog posts explain in detail why this API is the holy grail of building custom stages of any shape:

  • Akka team blog: Mastering GraphStages (part 1, introduction) - a high level overview
  • ... tomorrow we'll publish one about it's API as well...
  • Kunicki blog: Implementing a Custom Akka Streams Graph Stage - another example implementing sources (really applies 1:1 to building Flows)

这篇关于在Akka Streams中从演员创建流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆