将 Akka Stream Source 一分为二 [英] Split Akka Stream Source into two
问题描述
我有一个 Akka Streams Source
,我想根据谓词将其拆分为两个源.
I have an Akka Streams Source
which I want to split into two sources according to a predicate.
例如有一个来源(类型被有意简化):
E.g. having a source (types are simplified intentionally):
val source: Source[Either[Throwable, String], NotUsed] = ???
还有两种方法:
def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???
我希望能够根据 _.isRight
谓词拆分 source
并将右侧部分传递给 handleSuccess
方法和左侧handleFailure
方法的一部分.
I would like to be able to split the source
according to _.isRight
predicate and pass the right part to handleSuccess
method and left part to handleFailure
method.
我尝试使用 Broadcast
拆分器,但它最后需要 Sink
s.
I tried using Broadcast
splitter but it requires Sink
s at the end.
推荐答案
编辑:这个其他答案与 divertTo
是一个比我的更好的解决方案,IMO.我将保留我的答案以供后人使用.
Edit: this other answer with divertTo
is a better solution than mine, IMO. I'll leave my answer as-is for posterity.
原始答案:
这在 akka-stream-contrib
中实现为 PartitionWith
.将此依赖项添加到 SBT 以将其添加到您的项目中:
This is implemented in akka-stream-contrib
as PartitionWith
. Add this dependency to SBT to pull it in to your project:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"```
`PartitionWith` is shaped like a `Broadcast(2)`, but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a `Sink` or `Flow` to each of these outlets independently as appropriate. Building on [cessationoftime's example](https://stackoverflow.com/a/39744355/147806), with the `Broadcast` replaced with a `PartitionWith`:
val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))
val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
((_, _, _)) { implicit b => (s, l, r) =>
import GraphDSL.Implicits._
val pw = b.add(
PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
)
eitherSource ~> pw.in
pw.out0 ~> leftSink
pw.out1 ~> rightSink
ClosedShape
})
val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
这篇关于将 Akka Stream Source 一分为二的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!