基于akka流条件的替代流 [英] Alternative flows based on condition for akka stream
问题描述
有一个具有自定义流的流,在某个阶段,我想拆分该流并有两个替代的数据处理,稍后将再次合并。
Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.
Eg
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
应该有条件数据包含格式为 A
的数据,则应转到流 F3
,否则转到 F4
。
F2
should have a condition saying if data contains format A
then it should go to flow F3
, else go to F4
.
据我所知,每个流在每个方向上只能有一个端口(如果是双向,则只有两个)-那么我该如何支持这样的流?
As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?
推荐答案
您可以使用广播
拆分流,则可以在每个流上使用 filter
或 collect
来过滤所需的数据。
You can use Broadcast
to split the stream, then you will able to use filter
or collect
on each of streams to filter required data.
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
此外,还有 Partition
阶段处理输出端口的数量以及从值到端口号 f的映射函数:T => Int
。
Also, there is Partition
stage which handles the number of output ports and the map function from value to port number f: T => Int
.
val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
也许有一些更简单的方法。
Maybe there is some simpler way.
这篇关于基于akka流条件的替代流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!