为什么我的《派遣演员》在Akka中按比例缩小? [英] Why is my Dispatching on Actors scaled down in Akka?

查看:43
本文介绍了为什么我的《派遣演员》在Akka中按比例缩小?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由100个正在运行的Actor组成的actor池,这些池共享一个其CorePoolSize设置为100的偷窃调度程序。但是现在,当向其中一个Actor发送19条消息时,这19条消息并没有与19个Actor并行化, 5条消息并行运行。完成这5条消息后,接下来的5条消息将由这5个Actor再次处理,依此类推。为什么我的19条消息没有并行运行,我在这里错过了什么?

I have an actor pool of 100 running Actors which share a work stealing dispatcher with its CorePoolSize set to 100. But now when sending 19 messages to one of the Actors the 19 messages aren't parallelized to 19 Actors, there are only 5 messages running in parallel. When these 5 messages are finished, the next 5 messages are processed by these same 5 Actors again and so on. Why aren't my 19 messages running in parallel, what am i missing here?

我的代码基本上像这样:

My code looks basically like this:

object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  println("Actor: " + name + " Received: " + num)
                               Thread.sleep(10000)
                            }
    }
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = self.shutdownLinkedActors()
}

val supervisor = actorOf(new TestActorManager with CyclicLoadBalancing {
val testActors = (1 until 100 toList) map (i => actorOf(new TestActor(i)))   
}).start

println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

(1 until 20 toList) foreach { testActor ! _ }

输出:

Actor: 4 Received: 16
Actor: 3 Received: 17
Actor: 1 Received: 19
Actor: 59 Received: 1
Actor: 2 Received: 18

// 10 secs. are passing..

Actor: 4 Received: 15
Actor: 3 Received: 14
Actor: 1 Received: 13
Actor: 59 Received: 2
Actor: 2 Received: 12

// 10 secs. are passing..

Actor: 4 Received: 11
Actor: 3 Received: 10
Actor: 59 Received: 3
Actor: 2 Received: 8
Actor: 1 Received: 9

// 10 secs. are passing..

Actor: 4 Received: 7
Actor: 3 Received: 6
Actor: 59 Received: 4
Actor: 2 Received: 5

编辑:我正在使用Akka 1.0

edit: i'm using Akka 1.0

推荐答案

感谢您的询问,我对瓶颈进行了本地化,并在向Akka管理员的提交中解决了该问题:

thanks for your inquiry, I localized the bottleneck and fixed it in this commit to Akka master:

https://github.com/akka/akka/commit/e4e99ef56399e892206ce4a46b9a9107da6c7770

它将在Akka 1.1-RC1中发布

It will be released in Akka 1.1-RC1

干杯,

这篇关于为什么我的《派遣演员》在Akka中按比例缩小?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆