Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink) [英] Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink)

查看:34
本文介绍了Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Scala 和 Akka 完全陌生.我有一个简单的 RunnableFlow:

I'm totally new in Scala and Akka. I've a simple RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

现在我想要这样的东西:

Now I want something like this:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink.

But Flow2 should wait until 100 elements from Flow1 are available and then transform these 100 elements to a new element (which needs all 100 elements from Flow1) and give this new element to the Sink.

我做了一些研究,发现 显式用户定义缓冲区 但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换.有人可以解释一下吗?或者甚至更好地发布一个简单的小例子?还是两者兼而有之?

I did some research and found Explicit user defined buffers but I don´t understand how I can access all 100 elements from flow1 in flow2 and do some transformation with them. Can someone explain it? Or even better post a small simple example ? Or both?

推荐答案

Akka 定义集合

如果您不介意使用 akka 确定的集合类型,那么您可以改用 grouped 函数:

If you don't mind using an akka determined collection type then you can use the grouped function instead:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                             .runWith(Sink foreach println)

用户定义的集合

如果您想控制用于缓冲区的集合类型,例如SeqArray:

If you want to control the type of collection used for your buffer, e.g. a Seq or Array:

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]

然后你可以用两个 Flow 来执行这个操作.第一个 Flow 执行 scan 以构建元素序列:

Then you can perform this operation with two Flows. The first Flow executes a scan to build up a sequence of elements:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
  (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] = 
  Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
    (coll, i) => appendToMyCollection(coll, i)
  }

第二个流程是一个filter,用于一个大小合适的序列(即goldiLocks"):

The second Flow is a filter for a sequence with just the right size (i.e. "goldiLocks"):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
  Flow[MyCollectionType[Int]].filter(_.size == bufferSize)

这两个 Flow 可以组合以生成一个 Stream,它将生成所需的集合类型:

These two Flows can be combined to produce a Stream which will generate the desired collection type:

val stream = Source(1 to 100).via(buffer)
                             .via(goldiLocks)
                             .runWith(Sink foreach println)

这篇关于Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆