Akka流分批 [英] Akka stream batching
问题描述
学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长的变化将它们分为时间组进行处理。
Learning Akka Streams. I have a stream of records, many per time unit, already ordered by time (from Slick), and I want to batch them into time groups for processing by detecting when the time step changes.
示例
case class Record(time: Int, payload: String)
如果传入流是
Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...
我想将其转换为
Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...
到目前为止仅发现按固定数量的记录分组或分成许多子流,但是从我的角度来看,我不需要多个子流。
So far I've only found grouping by a fixed number of records, or splitting into many substreams, but from my perspective I don't need multiple substreams.
更新:我发现 批处理
,但它看起来更多地是与背压有关,而不仅仅是一直进行批处理。
Update: I found batch
, but it looks more concerned with backpressure than just batching all the time.
推荐答案
statefulMapConcat
是Akka Streams库中的多功能工具。
statefulMapConcat
is the multitool in the Akka Streams library.
val records =
Source(List(
Record(1, "a"),
Record(1, "k"),
Record(1, "k"),
Record(1, "a"),
Record(2, "r"),
Record(2, "o"),
Record(2, "c"),
Record(2, "k"),
Record(2, "s"),
Record(3, "!")
))
.concat(Source.single(Record(0, "notused"))) // needed to print the last element
records
.statefulMapConcat { () =>
var currentTime = 0
var payloads: Seq[String] = Nil
record =>
if (record.time == currentTime) {
payloads = payloads :+ record.payload
Nil
} else {
val previousState = (currentTime, payloads)
currentTime = record.time
payloads = Seq(record.payload)
List(previousState)
}
}
.runForeach(println)
运行以上命令将打印以下内容:
Running the above prints the following:
(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))
您可以将示例调整为打印批处理
对象。
You can adjust the example to print Batch
objects.
这篇关于Akka流分批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!