如何确保Akka中的分叉顺序 [英] How to guarantee sequentiality for forks in akka

查看:75
本文介绍了如何确保Akka中的分叉顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在为每个(小的)传入消息组创建一个参与者链,以确保它们的顺序处理和管道传递(各组通过通用ID进行区分).问题在于我们的链具有分叉,如A1 -> (A2 -> A3 | A4 -> A5),我们应保证通过A2 -> A3A4 -> A5的消息之间不会出现竞争.当前的遗留解决方案是阻止A1 actor直到当前消息被完全处理(在子链之一中):

We're creating a chain of actors for every (small) incoming group of messages to guarantee their sequential processing and piping (groups are differentiating by common id). The problem is that our chain has forks, like A1 -> (A2 -> A3 | A4 -> A5) and we should guarantee no races between messages going through A2 -> A3 and A4 -> A5. The currrent legacy solution is to block A1 actor til current message is fully processed (in one of sub-chains):

def receive { //pseudocode
    case x => ...
      val f = A2orA4 ? msg
      Await.complete(f, timeout)
}

结果,应用程序中的线程数与正在处理的消息数成正比,无论这些消息是活动的还是只是异步地等待外部服务的某些响应.它与fork-join(或任何其他动态)池一起使用大约两年,但是当然不能与固定池一起使用,并且在高负载的情况下会极大地降低性能.不仅如此,它还会影响GC,因为每个阻塞的fork-actor都会在其中保存冗余的先前消息的状态.

As a result, count of threads in application is in direct ratio to the count of messages, that are in processing, no matter these messages are active or just asynchronously waiting for some response from outer service. It works about two years with fork-join (or any other dynamic) pool but of course can't work with fixed-pool and extremely decrease performance in case of high-load. More than that, it affects GC as every blocked fork-actor holds redundant previous message's state inside.

即使有背压,它创建的线程数也比接收到的消息多N倍(因为流中有N个连续的分叉),这仍然很糟糕,因为处理一条消息需要很长时间,但CPU占用不多.因此,只要有足够的内存,我们就应该处理更多的消息.我想出的第一个解决方案-使链像A1 -> A2 -> A3 -> A4 -> A5一样线性化.有什么更好的吗?

Even with backpressure it creates N times more threads than messages received (as there is N sequential forks in the flow), which is still bad as proceesing of one message takes a long time but not much CPU. So we should process as more messages as we have enough memory for. First solution I came up with - to linearize the chain like A1 -> A2 -> A3 -> A4 -> A5. Is there any better?

推荐答案

更简单的解决方案是将最后收到的消息的未来存储到参与者的状态中,并将其与先前的未来联系起来:

The simpler solution is to store a future for last received message into the actor's state and chain it with previous future:

def receive = process(Future{new Ack}) //completed future
def process(prevAck: Future[Ack]): Receive = { //pseudocode
    case x => ...
        context become process(prevAck.flatMap(_ => A2orA4 ? msg))
}

因此它将创建期货链而没有任何阻碍.期货交易完成(最后一个除外)后,该链将被擦除.

So it will create chain of futures without any blocking. The chain will be erased after futures completion (except the last one).

这篇关于如何确保Akka中的分叉顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆