使用 akka 流有条件地跳过流 [英] Conditionally skip flow using akka streams

查看:25
本文介绍了使用 akka 流有条件地跳过流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 akka 流,并且我有一段我需要有条件地跳过的图表,因为流程无法处理某些值.具体来说,我有一个接受字符串并发出 http 请求的流,但是当字符串为空时,服务器无法处理这种情况.但我只需要返回一个空字符串.有没有办法做到这一点而不必通过 http 请求知道它会失败?我基本上有这个:

I'm using akka streams and I have a segment of my graph that I need to conditionally skip because the flow can't handle certain values. Specifically, I have a flow that takes a string and makes http requests, but the server can't handle the case when the string is empty. But I need to just return an empty string instead. Is there a way of doing this without having to go through the http request knowing it will fail? I basically have this:

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)

我唯一能想到的就是在我的 httpResponse 流中捕获 400 错误并返回一个默认值.但我希望能够避免因我事先知道会失败的请求而访问服务器的开销.

The only thing I can think of doing is catching the 400 error in my httpResponse flow and returning a default value. But I'd like to be able to avoid the overhead of hitting the server for a request I know is going to fail beforehand.

推荐答案

Viktor Klang 的解决方案简洁而优雅.我只是想展示一个使用 Graphs 的替代方案.

Viktor Klang's solution is concise and elegant. I just wanted to demonstrate an alternative using Graphs.

您可以将字符串源拆分为两个流,并过滤一个流为有效字符串,另一个流为无效字符串.然后合并结果(跨流").

You can split your source of Strings into two streams and filter one stream for valid Strings and the other stream for invalid Strings. Then merge the results ("cross the streams").

基于文档:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._

  val source = Source(List("1", "2", "", "3", "4"))
  val sink : Sink[String,_] = ???

  val bcast = builder.add(Broadcast[String](2))
  val merge = builder.add(Merge[String](2))

  val validReq =   Flow[String].filter(_.size > 0)
  val invalidReq = Flow[String].filter(_.size == 0)

  val httpRequest: Flow[String, HttpRequest, _] = ???
  val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
  val httpResponse: Flow[HttpResponse, String, _] = ???
  val someHttpTransformation = httpRequest via makeHttpCall via httpResponse

  source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
            bcast ~>      invalidReq                    ~> merge
  ClosedShape
})

注意:此解决方案拆分流,因此接收器可能会以与基于输入的预期不同的顺序处理字符串值结果.

Note: this solution splits the stream, therefore the Sink may process String value results in a different order than is expected based on the inputs.

这篇关于使用 akka 流有条件地跳过流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆