Akka Stream - 根据流中的元素选择接收器 [英] Akka Stream - Select Sink based on Element in Flow

查看:36
本文介绍了Akka Stream - 根据流中的元素选择接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Akka 流创建一个简单的消息传递服务.该服务就像邮件传递一样,其中来自源的元素包括 destinationcontent,例如:

I'm creating a simple message delivery service using Akka stream. The service is just like mail delivery, where elements from source include destination and content like:

case class Message(destination: String, content: String)

并且服务应该根据 destination 字段将消息传递到适当的接收器.我创建了一个 DeliverySink 类来让它有一个名字:

and the service should deliver the messages to appropriate sink based on the destination field. I created a DeliverySink class to let it have a name:

case class DeliverySink(name: String, sink: Sink[String, Future[Done]])

现在,我实例化了两个 DeliverySink,让我称它们为 sinkXsinkY,并根据它们的名称创建了一个地图.在实践中,我想提供一个接收器名称列表,该列表应该是可配置的.

Now, I instantiated two DeliverySink, let me call them sinkX and sinkY, and created a map based on their name. In practice, I want to provide a list of sink names and the list should be configurable.

我面临的挑战是如何根据 destination 字段动态选择合适的接收器.

The challenge I'm facing is how to dynamically choose an appropriate sink based on the destination field.

最终,我想将 Flow[Message] 映射到接收器.我试过了:

Eventually, I want to map Flow[Message] to a sink. I tried:

val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] = 
   sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)

但是,显然这不起作用,因为我们不能在地图之外引用 msg...

but, obviously this doesn't work because we can't reference msg outside of map...

我想这不是一个正确的方法.我也考虑过将 filterbroadcast 一起使用,但是如果目标缩放到 100,我无法键入每个路由.实现目标的正确方法是什么?

I guess this is not a right approach. I also thought about using filter with broadcast, but if the destination scales to 100, I cannot type every routing. What is a right way to achieve my goal?

理想情况下,我想让目的地动态化.因此,我无法在过滤器或路由逻辑中静态键入所有目的地.如果目标接收器尚未连接,它也应动态创建一个新接收器.

Ideally, I would like to make destinations dynamic. So, I cannot statically type all destinations in filter or routing logic. If a destination sink has not been connected, it should create a new sink dynamically too.

推荐答案

如果您必须使用多个接收器

Sink.combine 将直接满足您现有的要求.如果您在每个 Sink 之前附加适当的 Flow.filter,那么它们只会收到适当的消息.

Sink.combine would directly suite your existing requirements. If you attach an appropriate Flow.filter before each Sink then they'll only receive the appropriate messages.

不要使用多个接收器

总的来说,我认为流的结构和内容包含业务逻辑是糟糕的设计.您的流应该是在普通 scala/java 代码中的业务逻辑之上的背压并发的薄饰面.

In general I think it is bad design to have the structure, and content, of streams contain business logic. Your stream should be a thin veneer for back-pressured concurrency on top of business logic which is in ordinary scala/java code.

在这种特殊情况下,我认为最好将目标路由包装在单个 Sink 内,并且逻辑应在单独的函数内实现.例如:

In this particular case, I think it would be best to wrap your destination routing inside of a single Sink and the logic should be implemented inside of a separate function. For example:

val routeMessage : (Message) => Unit = 
  (message) => 
    if(message.destination equalsIgnoreCase "stdout")
      System.out println message.content
    else if(message.destination equalsIgnoreCase "stderr")
      System.err println message.content

val routeSink : Sink[Message, _] = Sink foreach routeMessage

请注意,现在测试我的 routeMessage 容易得多,因为它不在流内:我不需要任何 akka testkit东西"来测试 routeMessage.如果我的并发设计要改变,我也可以将函数移到 FutureThread.

Note how much easier it is to now test my routeMessage since it isn't inside of the stream: I don't need any akka testkit "stuff" to test routeMessage. I can also move the function to a Future or a Thread if my concurrency design were to change.

许多目的地

如果你有很多目的地,你可以使用Map.例如,假设您将消息发送到 AmazonSQS.您可以定义一个函数来将队列名称转换为队列 URL 并使用该函数来维护已创建名称的映射:

If you have many destinations you can use a Map. Suppose, for example, you are sending your messages to AmazonSQS. You could define a function to convert a Queue Name to Queue URL and use that function to maintain a Map of already created names:

type QueueName = String

val nameToRequest : (QueueName) => CreateQueueRequest = ???  //implementation unimportant

type QueueURL = String

val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
  val nameToURL = mutable.Map.empty[QueueName, QueueURL]

  (sqs) => (queueName) => nameToURL.get(queueName) match {
    case Some(url) => url
    case None => {
      sqs.createQueue(nameToRequest(queueName))
      val url = sqs.getQueueUrl(queueName).getQueueUrl()

      nameToURL put (queueName, url)

      url
    }
  }
}

现在你可以在单个 Sink 中使用这个非流函数:

Now you can use this non-stream function inside of a singular Sink:

val sendMessage : (AmazonSQS) => (Message) => Unit = 
  (sqs) => (message) => 
    sqs sendMessage {
      (new SendMessageRequest())
        .withQueueUrl(nameToURL(sqs)(message.destination))
        .withMessageBody(message.content)
    }

val sqs : AmazonSQS = ???

val messageSink = Sink foreach sendMessage(sqs)

旁注

对于 destination,您可能想使用 String 以外的其他内容.coproduct 通常更好,因为它们可以与 case 语句一起使用,并且您将获得有用的编译器如果您错过了其中一种可能性,则会出错:

For destination you probably want to use something other than String. A coproduct is usually better because they can be used with case statements and you'll get helpful compiler errors if you miss one of the possibilities:

sealed trait Destination

object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination

case class Message(destination: Destination, content: String)

//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit = 
  (message) => message.destination match {
    case Out =>
      System.out.println(message.content)
    case Err =>
      System.err.println(message.content)
  }

这篇关于Akka Stream - 根据流中的元素选择接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆