合成流的目的是什么(来自Sink和Source)? [英] What is the purpose of the composite flow(from Sink and Source)?

查看:166
本文介绍了合成流的目的是什么(来自Sink和Source)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从



有人可以举一个使用复合流的例子吗?

什么时候应该使用它?

解决方案

Flow.fromSinkAndSource 提供了一种方便的方法来组装由以下内容组成的 接收器作为输入, source 作为未连接的输出,可以用下图(在API链接中提供)最好地说明:

  + ------------------------------------ ---------- + 
|结果流[I,O,未使用] |
| |
| + --------- + + ----------- + |
| | | | | |
我~~> |水槽[I] | [无连接!] |来源[O] | ~~> O
| | | | | |
| + --------- + + ----------- + |
+ --------------------------------------------- -+

如@gabrielgiussi的答案所示,它通常用于需要切换的情况将现有来源(或)的输出输出到某些不同的输出-用于测试或其他目的。这是一个简单的示例:

  import akka.actor.ActorSystem 
import akka.stream .scaladsl._
隐式val系统= ActorSystem( system)
隐式val实体化= ActorMaterializer()

val switchFlow = Flow.fromSinkAndSource(Sink.ignore,Source( List( a, b, c))))

Source(1到5).via(switchFlow).runForeach(println)
// res1:scala。 parallel.Future [akka.Done] = Future(Success(Done))
// a
// b
// c

还值得注意的是该方法的 Mat版本, fromSinkAndSourceMat ,有一些有趣的用例。一个示例是使用 Source.maybe [T] 来保持半关闭 WebSockets打开的状态。 Promise [Option [T]] 作为实现的值,当您要关闭连接时将完成。以下是 Akka-http WebSockets客户端支持文档:

  //使用Source。可能实现为一个承诺
// //这将使我们能够在以后完成源
val流:Flow [Message,Message,Promise [Option [Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach [Message](println),
Source.maybe [Message])(Keep.right)

val(upgradeResponse,promise )=
Http()。singleWebSocketRequest(
WebSocketRequest( ws://example.com:8080 / some / path),
flow)

/ /稍后再断开
promise.success(无)


I am trying the understand composite flow (from Sink and Source) from the website and they represent as the following:

Could someone please provide an example for the usage of composite flow.
And when should I use it?

解决方案

Flow.fromSinkAndSource provides a convenient way to assemble a flow composed with a sink as its input and a source as its output that are not connected, which can be best illustrated with the following diagram (available in the API link):

  +----------------------------------------------+
  | Resulting Flow[I, O, NotUsed]                |
  |                                              |
  |  +---------+                  +-----------+  |
  |  |         |                  |           |  |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
  |  |         |                  |           |  |
  |  +---------+                  +-----------+  |
  +----------------------------------------------+

As shown in @gabrielgiussi's answer, it's often used in cases where one wants to "switch" the output of an existing source( or flow) to some different output - for testing purposes or what-not. Here's a trivialized example:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )

Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c

It's also worth noting that the method's "Mat" version, fromSinkAndSourceMat, has some interesting use cases. An example is to use it to keep half-closed WebSockets open by using Source.maybe[T] to maintain a Promise[Option[T]] as the materialized value which will be completed when one wants to close the connection. Below is the sample code from the relevant section in the Akka-http WebSockets client support document:

// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
  Flow.fromSinkAndSourceMat(
    Sink.foreach[Message](println),
    Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) =
  Http().singleWebSocketRequest(
    WebSocketRequest("ws://example.com:8080/some/path"),
    flow)

// at some later time we want to disconnect
promise.success(None)

这篇关于合成流的目的是什么(来自Sink和Source)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆