Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink) [英] Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink)
问题描述
我对 Scala 和 Akka 完全陌生.我有一个简单的 RunnableFlow:
I'm totally new in Scala and Akka. I've a simple RunnableFlow:
Source -> Flow (do some transformation) -> Sink.runForeach
现在我想要这样的东西:
Now I want something like this:
Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink.
But Flow2 should wait until 100 elements from Flow1 are available and then transform these 100 elements to a new element (which needs all 100 elements from Flow1) and give this new element to the Sink.
我做了一些研究,发现 显式用户定义缓冲区 但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换.有人可以解释一下吗?或者甚至更好地发布一个简单的小例子?还是两者兼而有之?
I did some research and found Explicit user defined buffers but I don´t understand how I can access all 100 elements from flow1 in flow2 and do some transformation with them. Can someone explain it? Or even better post a small simple example ? Or both?
推荐答案
Akka 定义集合
如果您不介意使用 akka 确定的集合类型,那么您可以改用 grouped
函数:
If you don't mind using an akka determined collection type then you can use the grouped
function instead:
//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
.runWith(Sink foreach println)
用户定义的集合
如果您想控制用于缓冲区的集合类型,例如Seq
或 Array
:
If you want to control the type of collection used for your buffer, e.g. a Seq
or Array
:
type MyCollectionType[X] = Array[X]
def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
然后你可以用两个 Flow 来执行这个操作.第一个 Flow 执行 scan
以构建元素序列:
Then you can perform this operation with two Flows. The first Flow executes a scan
to build up a sequence of elements:
val bufferSize = 10
def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] =
(if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
val buffer : Flow[Int, MyCollectionType[Int], _] =
Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
(coll, i) => appendToMyCollection(coll, i)
}
第二个流程是一个filter
,用于一个大小合适的序列(即goldiLocks"):
The second Flow is a filter
for a sequence with just the right size (i.e. "goldiLocks"):
val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
这两个 Flow 可以组合以生成一个 Stream,它将生成所需的集合类型:
These two Flows can be combined to produce a Stream which will generate the desired collection type:
val stream = Source(1 to 100).via(buffer)
.via(goldiLocks)
.runWith(Sink foreach println)
这篇关于Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!