Akka Streams:流中的状态 [英] Akka Streams: State in a flow
问题描述
我想使用 Akka Streams 读取多个大文件来处理每一行.想象一下,每个键都包含一个 (identifier -> value)
.如果找到一个新的标识符,我想把它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值.为此,我认为我需要某种递归状态流来保留已经在 Map
中找到的标识符.我想我会在这个流程中收到一对 (newLine, contextWithIdentifiers)
.
I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value)
. If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map
. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers)
.
我刚刚开始研究 Akka Streams.我想我可以管理自己做无状态处理的事情,但我不知道如何保留 contextWithIdentifiers
.我很感激任何指向正确方向的指示.
I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers
. I'd appreciate any pointers to the right direction.
推荐答案
也许 statefulMapConcat
之类的东西可以帮助你:
Maybe something like statefulMapConcat
can help you:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id)) {
//save value to DB
println(identValue.value)
List(identValue)
} else {
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
}
}
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess { case identValue => println(identValue) }
这篇关于Akka Streams:流中的状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!