演员邮箱溢出.斯卡拉 [英] Actors Mailbox Overflow. Scala

查看:109
本文介绍了演员邮箱溢出.斯卡拉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前在斯卡拉与两个演员合作.一个是 producer ,它产生一些数据并将其发送到 parcer .生产者通过消息发送HashMap[String,HashMap[Object,List[Int]]](连同 this 标记发送者):

Im currently working with two actors in scala. One, the producer, produces some data and sends it to a parcer. The producer sends a HashMap[String,HashMap[Object,List[Int]]] through a message (along with this to mark the sender):

parcer ! (this,data)

解析器一直在等待消息,例如:

The parser is constantly waiting for messages like so:

def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

该程序在正常情况下可以完美运行.问题在于大量的数据和发送的许多消息(散列有大约10 ^ 4个元素,内部散列有大约100个元素,列表长100个),程序崩溃了.它没有显示错误或异常.只是停下来.

The program runs perfectly in normal circunstances. The problem comes with big volumes of data and many messages sent (The hash has about 10^4 elements, the inner hash about 100 elements and the list is 100 long), the program crashes. It shows no Error nor Exception. It just stopps.

问题似乎是我的生产者比解析器工作得快得多(并且目前我不希望有多个解析器).

The problem seems to be that my producer works much faster than the parser (and for the moment I don't want more than one parser).

阅读 scala邮箱大小限制后,我想知道我的解析器的邮箱是否已达到限制.该帖子还提供了一些解决方案,但我首先需要确保这是问题所在.我该如何测试?

After reading scala mailbox size limit I wonder if my parser's mailbox is reaching it's limit. The post also offers some solutions, but I first need to make sure this is the problem. How can I test this?

有没有办法知道演员的记忆极限?怎样读取邮箱中的已用/可用内存?

Is there a way to know the actor's memory limit? What about reading the used/free memory in the mailbox?

也欢迎未在该链接中发布的任何有关工作流的建议

Any suggestions for the workflow that haven't been posted in that link are also welcome.

谢谢

推荐答案

首先,由于Scala actors框架始终跟踪发件人,因此您无需显式传递发件人.您始终可以使用方法sender访问邮件的发件人.

First, you need not pass the sender explicitly, as the sender is tracked by the Scala actors framework anyway. You can always access the sender of a message using the method sender.

在这里可以看到:

As can be seen here: scala.actors.MQueue, an actor's mailbox is implemented as a linked list and is therefore only bounded by the size of the heap.

不过,如果您担心生产者的速度非常快而消费者的速度却很慢,我建议您探索一种调节机制.但我不建议从已接受的答案回答问题 scala邮箱大小限制.

Still, if you are concerned that the producer is very fast and the consumer is very slow, I suggest that you explore a throttling mechanism. But I wouldn't recommend the approach from the accepted answer to question scala mailbox size limit.

通常,在系统承受重压时尝试发送过载消息似乎不是一个好主意.如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法采取行动怎么办?另外,对我来说,删除消息听起来不是一个好主意.我认为您希望所有工作项目都得到可靠处理.

Trying to send overload messages when the system is heavily stressed doesn't seem to be a good idea, generally. What if your system is too busy to check for overload? What if the receiver of the overload message is too busy to act on it? Also, dropping messages doesn't sound like a very good idea to me. I would think that you want all your work items processed reliably.

此外,我不会依靠mailboxSize来确定负载.您无法区分不同的消息类型,只能从使用者内部进行检查,而不能从生产者进行检查.

Also, I wouldn't rely on the mailboxSize to determine load. You cannot distinguish different message types and you can only check from within the consumer itself, not from the producer.

我建议使用一种方法,让消费者知道自己可以处理更多的工作.

I suggest using an approach where the consumer requests more work, when he knows he can handle it.

下面是一个简单的示例,该示例如何实现.

Below is a simple example how it could be implemented.

import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}

这篇关于演员邮箱溢出.斯卡拉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆