Akka流分批 [英] Akka stream batching

查看:132
本文介绍了Akka流分批的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长的变化将它们分为时间组进行处理。

Learning Akka Streams. I have a stream of records, many per time unit, already ordered by time (from Slick), and I want to batch them into time groups for processing by detecting when the time step changes.

示例

case class Record(time: Int, payload: String)

如果传入流是

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

我想将其转换为

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

到目前为止仅发现按固定数量的记录分组或分成许多子流,但是从我的角度来看,我不需要多个子流。

So far I've only found grouping by a fixed number of records, or splitting into many substreams, but from my perspective I don't need multiple substreams.

更新:我发现 批处理 ,但它看起来更多地是与背压有关,而不仅仅是一直进行批处理。

Update: I found batch, but it looks more concerned with backpressure than just batching all the time.

推荐答案

statefulMapConcat 是Akka Streams库中的多功能工具。

statefulMapConcat is the multitool in the Akka Streams library.

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

运行以上命令将打印以下内容:

Running the above prints the following:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

您可以将示例调整为打印批处理对象。

You can adjust the example to print Batch objects.

这篇关于Akka流分批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆