基于 akka 流条件的替代流 [英] Alternative flows based on condition for akka stream

查看:35
本文介绍了基于 akka 流条件的替代流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个带有自定义流的流,在某个阶段我想拆分流并有两个替代数据处理,稍后将再次合并.

Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.

例如

                  -> F3 -> F6 
Src -> F1 -> F2                > Merge -> Sink 
                  -> F4 -> F5

F2 应该有一个条件,如果数据包含格式A,那么它应该去流程F3,否则去F4.

F2 should have a condition saying if data contains format A then it should go to flow F3, else go to F4.

据我所知,每个流在每个方向上只能有一个端口(如果是双向的,则有两个)——那么我如何支持这样的流?

As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?

推荐答案

您可以使用 Broadcast 来分割流,然后您将可以使用 filter>collect 在每个流上过滤所需的数据.

You can use Broadcast to split the stream, then you will able to use filter or collect on each of streams to filter required data.

val split = builder.add(Broadcast[Int](2))

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
                   -> filterCondB -> F4 -> F5 -> Merge

此外,还有 Partition 阶段处理输出端口的数量和从值到端口号的映射函数 f: T =>Int.

Also, there is Partition stage which handles the number of output ports and the map function from value to port number f: T => Int.

val portMapper(value: T): Int = value match {
  case CondA => 0
  case CondB => 1
}

val split = builder.add(Partition[T](2, portMapper))

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
             split -> F4 -> F5 -> Merge

也许有一些更简单的方法.

Maybe there is some simpler way.

这篇关于基于 akka 流条件的替代流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆