Akka同步来自多个参与者的带有时间戳的消息 [英] Akka synchronizing timestamped messages from several actors

查看:113
本文介绍了Akka同步来自多个参与者的带有时间戳的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象一下以下架构。在akka中有一个演员通过websocket接收推送消息。它们具有时间戳,并且这些时间戳之间的间隔为1分钟。尽管具有相同时间戳的消息可以通过websocket多次到达。然后将此消息广播给其他三个演员(ma)作为示例。他们计算指标,并将消息进一步发送给一个参与者(c)。

Imagine the following architecture. There is an actor in akka that receives push messages via websocket. They have a timestamp and interval between those timestamps is 1 minute. Though the messages with the same timestamp can arrive multiple times via websocket. And then this messages are being broadcasted to as example three further actors (ma). They calculate metrics and push the messages further to the one actor(c).

对于 ma 我定义了一个TimeSeriesBuffer,它仅在实体具有相应时间戳的情况下才允许写入缓冲区。成功推送到缓冲区 ma 的发射指标后,该指标将转到 c c 仅在具有所有三个指标时才能更改其状态。因此,我定义了一个特征Synchronizable,然后定义了具有主从架构的SynchronizableTimeSeriesBuffer。

For ma I defined a TimeSeriesBuffer that allows writing to the buffer only if entities have consequent timestamps. After successfull push to the buffer ma's emit metrics, that go to the c. c can only change it's state when it has all three metrics. Therefore I defined a trait Synchronizable and then a SynchronizableTimeSeriesBuffer with "master-slave" architecture.

每次向每个缓冲区推送时,都会触发检查,以了解所有三个SynchronizableTimeSeriesBuffer的缓冲区中是否有新元素具有相同的时间戳,可以进一步发出该元素到 c 作为单个消息。

On each push to every buffer a check is triggered in order to understand if there are new elements in the buffers of all three SynchronizableTimeSeriesBuffer with the same timestamp that can be emitted further to c as a single message.

所以这是问题:

1)解决方案太复杂了吗?

1) Is it too complicated of a solution?

2)是否有更好的方法来实现scala和akka?

2) Is there a better way to do it in terms of scala and akka?

3)为什么在系统中而不是一个一个地接收消息,而是从db中大批量加载消息并将其馈送到系统以进行回测时,它为什么不那么快且不那么并行指标。 (其中一个缓冲区的填充速度比其他缓冲区快得多,而另一个缓冲区的长度为0)。我假设它与akka关于调度/邮箱的设置有关。

3) Why is it not so fast and not so parallel when messages in the system instead of being received "one by one" are loaded from db in a big batch and fed to the system in order to backtest the metrics. (one of the buffers is filling much faster than the others, while other one is at 0 length). I have an assumption it has something to do with akka's settings regarding dispatching/mailbox.

我创建了有关以下代码的要点:$ b​​ $ b https://gist.github.com/ifif14/18b5f85cd638af7023462227cd595a2f

I created a gist with regarding code: https://gist.github.com/ifif14/18b5f85cd638af7023462227cd595a2f

我非常感谢社区为解决这个不平凡的案件所提供的帮助。

I would much appreciate the community's help in solving this nontrivial case.

预先感谢

Igor

推荐答案

非常感谢您的回答。切断的部分也是我在Synchronizable Trait中实现的。

Thanks so much for your answer. The part with cutting off is what I also implemented in Synchronizable Trait.

  //clean up slaves. if their queue is behind masters latest element
  master_last_timestamp match {
    case Some(ts) => {
      slaves.foreach { s =>
        while ( s.queue.length > 0 && s.getElementTimestamp(s.queue.front) < ts ) {
          s.dequeue()
        }
        // val els = s.dequeueAll { queue_el => s.getElementTimestamp(queue_el) < ts }
      }
    }
    case _ => Unit
  }

我开始实现缓冲区的原因是因为我觉得自己将在系统中大量使用它,我不认为要为要使用的每个演员编写此部分。

The reason why I started to implement the buffer is because I feel like I will be using it a lot in the system and I don't think to write this part for each actor I will be using. Seems easier to have a blueprint that does it.

但更重要的原因是,由于某种原因,一个缓冲区的填充速度比其他缓冲区慢或根本没有填充二。虽然他们是由相同的演员填补! (只是不同的实例,计算时间应该几乎相同),然后在其他两个参与者发出了从数据库传递的所有消息之后,第三个参与者开始接收它。在我看来,这个演员只是没有得到处理器时间。因此,我认为这可能是调度员的设置。你熟悉吗?

But a more important reason is that for some reason one buffer is either being filled much slower or not at all than the other two. Though they are being filled by the same actors!! (just different instances, and computation time should be pretty much the same) And then after two other actors emitted all messages that were "passed" from the database the third one starts receiving it. It feels to me that this one actor is just not getting processor time. So I think it's a dispatcher's setting that can affect this. Are you familiar with this?

我还希望调度程序的工作更像轮询,因为每个进程都有一点执行时间,但最终只为有限的参与者提供服务,然后跳到接下来的。尽管由于有广播公司,他们不得不同时接收初始消息。

Also I would expect dispatcher work more like round-robin, given each process a little of execution time, but it ends up serving only limited amount of actors and then jumping to the next ones. Although they sort of have to receive initial messages at the same time since there is a broadcaster.

我阅读了有关调度程序和邮箱的akka​​文档,但我仍然不明白怎么做。

I read akka documentation on dispatchers and mailboxes, but I still don't understand how to do it.

谢谢

伊戈尔

这篇关于Akka同步来自多个参与者的带有时间戳的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆