基于akka流条件的替代流 [英] Alternative flows based on condition for akka stream

查看:82
本文介绍了基于akka流条件的替代流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个具有自定义流的流,在某个阶段,我想拆分该流并有两个替代的数据处理,稍后将再次合并。

Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.

Eg

                  -> F3 -> F6 
Src -> F1 -> F2                > Merge -> Sink 
                  -> F4 -> F5

F2 应该有条件数据包含格式为 A 的数据,则应转到流 F3 ,否则转到 F4

F2 should have a condition saying if data contains format A then it should go to flow F3, else go to F4.

据我所知,每个流在每个方向上只能有一个端口(如果是双向,则只有两个)-那么我该如何支持这样的流?

As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?

推荐答案

您可以使用广播拆分流,则可以在每个流上使用 filter collect 来过滤所需的数据。

You can use Broadcast to split the stream, then you will able to use filter or collect on each of streams to filter required data.

val split = builder.add(Broadcast[Int](2))

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
                   -> filterCondB -> F4 -> F5 -> Merge

此外,还有 Partition 阶段处理输出端口的数量以及从值到端口号 f的映射函数:T => Int

Also, there is Partition stage which handles the number of output ports and the map function from value to port number f: T => Int.

val portMapper(value: T): Int = value match {
  case CondA => 0
  case CondB => 1
}

val split = builder.add(Partition[T](2, portMapper))

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
             split -> F4 -> F5 -> Merge

也许有一些更简单的方法。

Maybe there is some simpler way.

这篇关于基于akka流条件的替代流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆