使用DeathWatch杀死Akka演员时未发送消息 [英] Message not being sent when Akka actor is killed using DeathWatch

查看:97
本文介绍了使用DeathWatch杀死Akka演员时未发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正试图在演员被杀后发送消息.

I'm attempting to send a message when an actor is killed.

这是基于Akka DeathWatch文档: http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

This is based on Akka deathwatch documentation : http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

在serviceActor中,我正在等待"kill"消息,但实际上从未发送过此消息.因此,为了在ServiceActor中接收消息,我使用:

In serviceActor I'm awaiting a "kill" message but I'm never actually sending this message. So to receive the message in ServiceActor I use :

else if (msg instanceof Terminated) {
        final Terminated t = (Terminated) msg;
        if (t.getActor() == child) {
            lastSender.tell(Msg.TERMINATED, getSelf());
        }
    } else {
        unhandled(msg);
    }

我将持续时间设置为10毫秒:

I've set the duration to 10 milliseconds :

Duration.create(10, TimeUnit.MILLISECONDS)

但是在onReceive方法中从未收到消息Msg.TERMINATED:

But the message Msg.TERMINATED is never received in onReceive method :

@Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }

当ServiceActor失败时,如何向HelloWorld发送消息?

How can I send a message to HelloWorld when ServiceActor fails ?

整个代码:

package terminatetest;
import akka.Main;

public class Launcher {

    public static void main(String args[]) {

        String[] akkaArgsArray = new String[1];

        akkaArgsArray[0] = "terminatetest.HelloWorld";

        Main.main(akkaArgsArray);

    }

}

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell(PoisonPill.getInstance(), getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}

package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
    {
        this.getContext().watch(child);
    }

    ActorRef lastSender = getContext().system().deadLetters();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            try {
                long startTime = System.currentTimeMillis();
                URL url = new URL(urlName);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.connect();

                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder out = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    out.append(line);
                }
                System.out.println("Connection successful to " + url);
                System.out.println("Content is " + out);
                long endTime = System.currentTimeMillis();
                System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

            } catch (MalformedURLException mue) {
                println("URL Name " + urlName);
                System.out.println("MalformedURLException");
                System.out.println(mue.getMessage());
                mue.printStackTrace();
                getSender().tell(Msg.FAIL, getSelf());
            } catch (IOException ioe) {
                println("URL Name " + urlName);
                System.out.println("IOException");
                System.out.println(ioe.getMessage());
                ioe.printStackTrace();
                System.out.println("Now exiting");
                getSender().tell(Msg.FAIL, getSelf());
            }
        }

        else if (msg instanceof Terminated) {
                final Terminated t = (Terminated) msg;
                if (t.getActor() == child) {
                    lastSender.tell(Msg.TERMINATED, getSelf());
                }
            } else {
                unhandled(msg);
            }
    }

}

更新: 我现在使用以下命令从儿童演员本身启动poisonPill:

Update : I'm now initiating the poisonPill from the child actor itself using :

更新到ServiceActor:

Update to ServiceActor :

if (urlName.equalsIgnoreCase("poisonPill")) {   
    this.getSelf().tell(PoisonPill.getInstance(), getSelf());
    getSender().tell(Msg.TERMINATED, getSelf());
}

更新到HelloWorld:

Update to HelloWorld :

system.scheduler().scheduleOnce(
        Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
            public void run() {
                greeter.tell("poisonPill", getSelf());
            }
        }, system.dispatcher());

这将显示以下输出:

startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated

poisonPill消息在10毫秒后发送,在此示例中,actor的生存时间为1268毫秒.那么,为什么在发送poisonPill时演员不终止?这是因为时间太短了吗?

The poisonPill message is sent after 10 milliseconds and for this example the actor lives for 1268 milliseconds. So why is the actor not terminating when the poisonPill is sent ? Is this because the timings are so short ?

更新的代码:

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell("poisonPill", getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}


package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    ActorRef lastSender = getSender();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            if (urlName.equalsIgnoreCase("poisonPill")) {   
                this.getSelf().tell(PoisonPill.getInstance(), getSelf());
                getSender().tell(Msg.TERMINATED, getSelf());
            }

            else {

                try {
                    long startTime = System.currentTimeMillis();
                    System.out.println("startTime : "+startTime);
                    URL url = new URL(urlName);
                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                    conn.connect();

                    BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                    StringBuilder out = new StringBuilder();
                    String line;
                    while ((line = reader.readLine()) != null) {
                        out.append(line);
                    }
                    System.out.println("Connection successful to " + url);
                    System.out.println("Content is " + out);
                    long endTime = System.currentTimeMillis();
                    System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

                } catch (MalformedURLException mue) {
                    println("URL Name " + urlName);
                    System.out.println("MalformedURLException");
                    System.out.println(mue.getMessage());
                    mue.printStackTrace();
                    getSender().tell(Msg.FAIL, getSelf());
                } catch (IOException ioe) {
                    println("URL Name " + urlName);
                    System.out.println("IOException");
                    System.out.println(ioe.getMessage());
                    ioe.printStackTrace();
                    System.out.println("Now exiting");
                    getSender().tell(Msg.FAIL, getSelf());
                }
            }
        }
    }

}

推荐答案

我认为您的问题源于以下事实:在构造ServiceActor的过程中,您仅将lastSender设置了一次,并将其明确设置为Deadletter.如果要将消息发回给发送String消息的演员,则需要将lastSender设置为该sender().否则,您的Msg.TERMINATED总是会死信.

I think your problem stems from the fact that you only set lastSender once, during construction of the ServiceActor, and you explicitly set it to deadletter. If you want to send a message back to the actor that sent you the String message, then you will need to set lastSender to that sender(). Failure to do so will result in your Msg.TERMINATED always going to deadletter.

编辑

我现在在这里看到真正的问题.在HelloWorld actor中,您正在将PoisonPill发送到ServiceActor.结果,ServiceActor自身将停止,从而也停止child ref(因为它是ServiceActor的子角色).此时,您会认为Terminated消息将传递给ServiceActor,因为它显式监视child(并且可能确实传递了),但是您已经向ServiceActor发送了PoisonPill它不会处理在该消息之后接收到的任何消息(可能是Terminate),因此这就是阻止的原因:

I see the real issue here now. In the HelloWorld actor, you are sending a PoisonPill to the ServiceActor. The ServiceActor will stop itself as a result, thus stopping the child ref too (as it's a child actor to ServiceActor). At this point, you would think the Terminated message would be delivered to ServiceActor because it explicitly watches child (and it probably does get delivered), but you've already sent a PoisonPill to ServiceActor so it will not process any messages received after that message (which would be the Terminate) so that's why the block:

else if (msg instanceof Terminated) {

从不击中ServiceActor.

EDIT2

您的演员收到第一个点击Google的请求,然后收到"poisonPill"消息(第二个10毫秒).当参与者按顺序处理其邮箱时,参与者会在处理邮件自行停止之前完全处理点击Google的请求.这就是演员在10毫秒后没有停止的原因.您不能在演员正在做的事情中停下来.

Your actor receives the request to hit google first and receives the "poisonPill" message second (10 milliseconds later). As an actor processes it's mailbox in order, the actor fully processes the request to hit google before it processes the message to stop itself. That's why the actor doesn't stop after 10 milliseconds. You can't stop an actor in the middle of what it's doing.

这篇关于使用DeathWatch杀死Akka演员时未发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆