使用Akka流有条件地跳过流 [英] Conditionally skip flow using akka streams

查看:105
本文介绍了使用Akka流有条件地跳过流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用akka流,但由于该流无法处理某些值,因此需要有条件跳过该图的一部分。具体来说,我有一个流程,它接受一个字符串并发出http请求,但是当字符串为空时,服务器无法处理这种情况。但是我只需要返回一个空字符串即可。有没有一种方法可以执行此操作而不必经历http请求,而知道它会失败?我基本上是这样的:

I'm using akka streams and I have a segment of my graph that I need to conditionally skip because the flow can't handle certain values. Specifically, I have a flow that takes a string and makes http requests, but the server can't handle the case when the string is empty. But I need to just return an empty string instead. Is there a way of doing this without having to go through the http request knowing it will fail? I basically have this:

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)

我唯一能想到的就是在httpResponse流中捕获400错误并返回a默认值。但是,我希望能够避免因事先知道会失败的请求而打入服务器的开销。

The only thing I can think of doing is catching the 400 error in my httpResponse flow and returning a default value. But I'd like to be able to avoid the overhead of hitting the server for a request I know is going to fail beforehand.

推荐答案

维克多·巴生(Viktor Klang)的解决方案简洁明了,优雅 。我只是想演示使用图的替代方法。

Viktor Klang's solution is concise and elegant. I just wanted to demonstrate an alternative using Graphs.

您可以将字符串的源分成两个流,并为一个有效字符串过滤一个流,为另一个无效字符串过滤另一个流。然后合并结果( 跨流) 。

You can split your source of Strings into two streams and filter one stream for valid Strings and the other stream for invalid Strings. Then merge the results ("cross the streams").

基于文档

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._

  val source = Source(List("1", "2", "", "3", "4"))
  val sink : Sink[String,_] = ???

  val bcast = builder.add(Broadcast[String](2))
  val merge = builder.add(Merge[String](2))

  val validReq =   Flow[String].filter(_.size > 0)
  val invalidReq = Flow[String].filter(_.size == 0)

  val httpRequest: Flow[String, HttpRequest, _] = ???
  val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
  val httpResponse: Flow[HttpResponse, String, _] = ???
  val someHttpTransformation = httpRequest via makeHttpCall via httpResponse

  source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
            bcast ~>      invalidReq                    ~> merge
  ClosedShape
})

注意:此解决方案拆分了流,因此Sink可能会处理String值,结果的顺序与基于输入的预期顺序不同。

Note: this solution splits the stream, therefore the Sink may process String value results in a different order than is expected based on the inputs.

这篇关于使用Akka流有条件地跳过流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆