合成流的目的是什么(来自Sink和Source)? [英] What is the purpose of the composite flow(from Sink and Source)?
问题描述
我正在尝试从
有人可以举一个使用复合流的例子吗?
什么时候应该使用它?
Flow.fromSinkAndSource 提供了一种方便的方法来组装由以下内容组成的流
接收器
作为输入, source
作为未连接的输出,可以用下图(在API链接中提供)最好地说明:
+ ------------------------------------ ---------- +
|结果流[I,O,未使用] |
| |
| + --------- + + ----------- + |
| | | | | |
我~~> |水槽[I] | [无连接!] |来源[O] | ~~> O
| | | | | |
| + --------- + + ----------- + |
+ --------------------------------------------- -+
如@gabrielgiussi的答案所示,它通常用于需要切换的情况将现有来源
(或流
)的输出输出到某些不同的输出-用于测试或其他目的。这是一个简单的示例:
import akka.actor.ActorSystem
import akka.stream .scaladsl._
隐式val系统= ActorSystem( system)
隐式val实体化= ActorMaterializer()
val switchFlow = Flow.fromSinkAndSource(Sink.ignore,Source( List( a, b, c))))
Source(1到5).via(switchFlow).runForeach(println)
// res1:scala。 parallel.Future [akka.Done] = Future(Success(Done))
// a
// b
// c
还值得注意的是该方法的 Mat版本, fromSinkAndSourceMat ,有一些有趣的用例。一个示例是使用 Source.maybe [T]
来保持半关闭
WebSockets打开的状态。 Promise [Option [T]]
作为实现的值,当您要关闭连接时将完成。以下是 Akka-http WebSockets客户端支持文档:
//使用Source。可能实现为一个承诺
// //这将使我们能够在以后完成源
val流:Flow [Message,Message,Promise [Option [Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach [Message](println),
Source.maybe [Message])(Keep.right)
val(upgradeResponse,promise )=
Http()。singleWebSocketRequest(
WebSocketRequest( ws://example.com:8080 / some / path),
flow)
/ /稍后再断开
promise.success(无)
I am trying the understand composite flow (from Sink and Source) from the website and they represent as the following:
Could someone please provide an example for the usage of composite flow.
And when should I use it?
Flow.fromSinkAndSource provides a convenient way to assemble a flow
composed with a sink
as its input and a source
as its output that are not connected, which can be best illustrated with the following diagram (available in the API link):
+----------------------------------------------+
| Resulting Flow[I, O, NotUsed] |
| |
| +---------+ +-----------+ |
| | | | | |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
| | | | | |
| +---------+ +-----------+ |
+----------------------------------------------+
As shown in @gabrielgiussi's answer, it's often used in cases where one wants to "switch" the output of an existing source
( or flow
) to some different output - for testing purposes or what-not. Here's a trivialized example:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )
Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c
It's also worth noting that the method's "Mat" version, fromSinkAndSourceMat, has some interesting use cases. An example is to use it to keep half-closed
WebSockets open by using Source.maybe[T]
to maintain a Promise[Option[T]]
as the materialized value which will be completed when one wants to close the connection. Below is the sample code from the relevant section in the Akka-http WebSockets client support document:
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)
这篇关于合成流的目的是什么(来自Sink和Source)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!