如何在Play framework 2应用程序中存储Akka actor的列表? [英] How to store a list of Akka actors in Play framework 2 application?

查看:199
本文介绍了如何在Play framework 2应用程序中存储Akka actor的列表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Play framework 2应用程序可以接收数据并通过WebSockets将其发送到多个客户端。我使用Akka actor来处理WebSockets,就像在本文档中一样。我还有一个 WebSocketRouter 类,它扩展了 UntypedActor 并包含路由逻辑(决定哪些客户端将数据传递给系统收到)。我知道我可以使用Akka的路由器功能,但这对我来说不是问题。问题是我必须存储所有活动客户端的列表。现在我将它存储在 WebSocketRouter 类的静态列表中。这是编写概念验证原型的最快方法,但它不是线程安全的,似乎不是Akka方式。
下面是一个简化的代码示例:

I've got a Play framework 2 application that can receive data and send it to multiple clients via WebSockets. I use Akka actors to work with WebSockets, just like in this documentation. I also have a WebSocketRouter class that extends UntypedActor and contains routing logic (decides, which clients to pass the data the system receives to). I know that i can use the Router functionality of Akka, but that is not the issue at the moment for me. The issue is that i have to store a list of all active clients. Right now i store it in a static list of the WebSocketRouter class. That was the fastest way to write a proof-of-concept prototype, but it is not thread-safe and does not seem to be "the Akka way". Below is a simplified code sample:

WebSocketController:

//This controller handles the creation of WebSockets.
public class WebSocketController extends Controller {
    public static WebSocket<String> index() {
        return WebSocket.withActor(new F.Function<ActorRef, Props>() {
            public Props apply(ActorRef out) throws Throwable {
                return MessageSender.props(out);
            }
        });
    }
}

MessageSender:

//Hold a reference to the auto-created Actor that handles WebSockets 
//and also registers and unregisters itself in the router.
public class  MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;

    public MessageSender(ActorRef out) {
        this.out = out;
    }

    @Override
    public void preStart() {
        WebSocketRouter.addSender(getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        }
        else {
            unhandled(message);
        }
    }

    public void postStop() {
        WebSocketRouter.removeSender(getSelf());
    }
}

WebSocketRouter:

public class WebSocketRouter extends UntypedActor {
    private static ArrayList<ActorRef> senders;
    static {
        senders = new ArrayList<>();
    }

    public static void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    public static void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

一次再次,我知道这是一个糟糕的解决方案,我正在寻求一个更好的解决方案。我曾想过创建一个可以保存当前连接的线程安全的单例类。我还想过在一些Akka演员的实例中保存当前连接列表并通过Akka消息修改列表,但为了这种方式工作,我必须存储 ActorRef 静态地访问该actor,以便可以从不同的 ActorSystem s访问它。

Once again, i know this is a bad solution and i'm seeking a better one. I have thought of creating a thread-safe singleton class that would hold current connections. I also thought about holding the list of current connections in an instance of some Akka actor and modifying the list via Akka messages, but for this way to work I'd have to store an ActorRef to that actor statically, so that it could be accessed from different ActorSystems.

什么是最好的解决我最适合Akka意识形态的问题的方法?

What is the best way to solve my problem that would fit best into Akka ideology?

推荐答案

而不是对Actor的静态引用( WebSocketRouter 在你的情况下),为什么不拿出一些消息发送呢?这样,演员可以以一致的方式维持自己的内部状态。通过消息进行状态更改是Actor模型的主要优点之一。

Instead of having a static reference to an Actor (WebSocketRouter in your case), why not come up with some messages to send it? That way, the actor can maintain its own internal state in a consistent way. State change through messages is one of the main benefits of the Actor Model.

在我进入代码之前,如果这不是100%准确,我很抱歉,我我只使用了Akka的Scala版本,并且正在快速扫描 Akka文档

Before I get into code, I'm sorry if this isn't 100% accurate, I've only used the Scala version of Akka and am basing this off a quick scan of the Akka Documentation.

所以在你的情况下,我会定义一些对象来表达加入/离开......

So in your case, I would define a few objects in order to express Join/Leave...

public class JoinMessage { } 
public class ExitMessage { } 

请注意,只有当您打算保持WebSocket打开并让用户停止收听时,才需要 ExitMessage 。路由器。否则,路由器可以检测到Actor何时被终止。

Note that ExitMessage is really only needed if you intend to keep your WebSocket open and have the user stop listening to the router. Otherwise, the router can detect when the Actor has been terminated.

然后你将你的 MessageSender actor更改为每当他们加入或离开聊天室时发送这些消息....

And then you would change your MessageSender actor to send these messages whenever they join or leave a chat room....

public class MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;
    private final ActorRef router;

    public MessageSender(ActorRef out) {
        this.out = out;
        this.router= getContext().actorSelection("/Path/To/WebSocketRouter");
    }

    @Override
    public void preStart() {
        router.tell(new JoinMessage(), getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        } else {
            unhandled(message);
        }
    }    
}

然后您的路由器可以更改在内部管理状态而不是在Actor上公开内部方法(你知道这不好)....

And then your router can change to manage state internally rather than exposing internal methods on the Actor (which as you know is not good)....

public class WebSocketRouter extends UntypedActor {
    private final Set<ActorRef> senders = new HashSet<>();

    private void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    private void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof JoinMessage) {
            addSender(sender);
            getContext().watch(sender); // Watch sender so we can detect when they die.
        } else if (message instanceof Terminated) {
            // One of our watched senders has died.
            removeSender(sender);
        } else if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

再次,此代码旨在通过利用Actor模型让您了解如何完成此任务。对不起,如果Java不是100%准确,但希望你能按照我的意图。

Again, this code is to give you an idea of how to accomplish this task by taking advantage of the Actor Model. Sorry if the Java isn't 100% accurate, but hopefully you can follow my intent.

这篇关于如何在Play framework 2应用程序中存储Akka actor的列表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆