如何取消Akka演员? [英] How to Cancel an Akka actor?

查看:98
本文介绍了如何取消Akka演员?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个akka演员(工人)接收请求并对其进行回复。请求处理可能需要3到60分钟。呼叫者(也是演员)当前正在使用!!!并等待future.get,但是可以根据需要更改Caller actor的设计。另外,我当前正在使用EventDriven调度程序。

I have an akka actor(worker) that receives a request and replies to it. The request processing can take 3-60 minutes. Caller(also an actor) is currently using !!! and waiting on future.get, however the design of Caller actor can be changed if required. Also, I'm currently using EventDriven dispatcher.

我如何取消(用户启动)请求处理,以便释放工作者角色并返回到就绪状态接收新请求?我希望找到一种类似于java.util.concurrent.Future的cancel方法,但在Akka 1.1.3中找不到

How do i Cancel(user initiated) the request processing so that the worker actor is freed up and returns to the ready state to receive new requests? I was hoping for a method similar to java.util.concurrent.Future's cancel method but couldn't find in Akka 1.1.3

编辑:

我们试图通过 completeWithException 来获得所需的行为:

We tried to get the behavior we are looking for with completeWithException:

object Cancel {
  def main(args: Array[String]) {
    val actor = Actor.actorOf[CancelActor].start
    EventHandler.info(this, "Getting future")
    val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.completeWithException(new Exception("cancel"))
    EventHandler.info(this, "Future is " + future.get)
  }
}

class CancelActor extends Actor {
  def receive = {
    case "request" =>
      EventHandler.info(this, "start")
      (1 to 5).foreach(x => {
        EventHandler.info(this, "I am a long running process")
        Thread.sleep(200L)
      })
      self reply "response"
      EventHandler.info(this, "stop")
  }
}

但这并不能阻止长期运行的过程。

But that did not stop the long-running process.

    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Getting future
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
    [ERROR]   [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture] 
    java.lang.Exception: cancel
        at kozo.experimental.Cancel$.main(Cancel.scala:15)
...

    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop

相反,请考虑java.util.concurrent.Future的行为:

In contrast consider the behavior of of a java.util.concurrent.Future:

object Cancel2 {
  def main(args: Array[String]) {
    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    EventHandler.info(this, "Getting future")
    val future = executor.submit(new Runnable {
      def run() {
        EventHandler.info(this, "start")
        (1 to 5).foreach(x => {
          EventHandler.info(this, "I am a long running process")
          Thread.sleep(200L)
        })
      }
    })
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.cancel(true)
    EventHandler.info(this, "Future is " + future.get)
  }
}

这会停止长期运行的过程

Which does stop the long running process

    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    Exception in thread "main" java.util.concurrent.CancellationException
...
    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling


推荐答案

您还可以检查Actor中Future的状态。

You could also check the status of the Future in the Actor.

class MyActor extends Actor {
  def receive = {
    case msg =>
      while(!self.senderFuture.get.isCompleted) {
        performWork(msg)
      }
      self reply result
  }
  ...
}

这要求消息以?或询问发送。
希望能有所帮助。

This requires the message to be sent with '?' or 'ask' though. Hope it helps.

这篇关于如何取消Akka演员?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆