Akka Streams:流中的状态 [英] Akka Streams: State in a flow

查看:50
本文介绍了Akka Streams:流中的状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 Akka Streams 读取多个大文件来处理每一行.想象一下,每个键都包含一个 (identifier -> value).如果找到一个新的标识符,我想把它和它的值保存在数据库中;否则,如果在处理行流时已经找到标识符,我只想保存该值.为此,我认为我需要某种递归状态流来保留已经在 Map 中找到的标识符.我想我会在这个流程中收到一对 (newLine, contextWithIdentifiers).

I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value). If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers).

我刚刚开始研究 Akka Streams.我想我可以管理自己做无状态处理的事情,但我不知道如何保留 contextWithIdentifiers.我很感激任何指向正确方向的指示.

I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers. I'd appreciate any pointers to the right direction.

推荐答案

也许 statefulMapConcat 之类的东西可以帮助你:

Maybe something like statefulMapConcat can help you:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) {
    //save value to DB
    println(identValue.value)
    List(identValue)
  } else {
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  }
}

Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess { case identValue => println(identValue) }

这篇关于Akka Streams:流中的状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆