基于 akka 流条件的替代流 [英] Alternative flows based on condition for akka stream
问题描述
有一个带有自定义流的流,在某个阶段我想拆分流并有两个替代数据处理,稍后将再次合并.
Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.
例如
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
应该有一个条件,如果数据包含格式A
,那么它应该去流程F3
,否则去F4
.
F2
should have a condition saying if data contains format A
then it should go to flow F3
, else go to F4
.
据我所知,每个流在每个方向上只能有一个端口(如果是双向的,则有两个)——那么我如何支持这样的流?
As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?
推荐答案
您可以使用 Broadcast
来分割流,然后您将可以使用 filter
或 >collect
在每个流上过滤所需的数据.
You can use Broadcast
to split the stream, then you will able to use filter
or collect
on each of streams to filter required data.
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
此外,还有 Partition
阶段处理输出端口的数量和从值到端口号的映射函数 f: T =>Int
.
Also, there is Partition
stage which handles the number of output ports and the map function from value to port number f: T => Int
.
val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
也许有一些更简单的方法.
Maybe there is some simpler way.
这篇关于基于 akka 流条件的替代流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!