Akka onReceive 方法是否并发执行? [英] Does Akka onReceive method execute concurrently?

查看:22
本文介绍了Akka onReceive 方法是否并发执行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的场景:

我有一个主actor,它接收来自多个子actor的消息.这些消息包含要聚合的数据.在这个聚合逻辑中,如果我使用共享数据结构来收集聚合,我是否需要处理同步问题?

I have a master actor, which receives messages from multiple child actors. These messages contain data to be aggregated. In this aggregation logic, do I need to take care of synchronization issues, if I use a shared data structure to collect the aggregation?

else if(arg0 instanceof ReducedMsg){

                           ReducedMsg reduced = (ReducedMsg)arg0;
        counter.decrementAndGet();

        synchronized(finalResult){

            finalResult.add((KeyValue<K, V>) reduced.getReduced());

            if(counter.get() == 0){
                                    if(checkAndReduce(finalResult)){

                    finalResult.clear();
                }
                else{
                    stop();
                    latch.countDown();
                }

            }

        }



    }

如您所见,我有一个 finalResult,每条消息都将聚合到该结果中,并且在处理逻辑之后,还需要清除集合.

So as you can see I have a finalResult, to which each message will be aggregated, and after a processing logic the collection needs to be cleared as well.

实际上我想要实现的是递归(关联)归约 mapreduce.所以我需要保留我假设的同步块?还是说 Akka 一次执行一个线程?

Actually what I am trying to implement is a recursive (associative) reduction mapreduce. So I need to keep the synchronized block I assume? Or is it by any chance Akka executes the onReceive one thread at a time?

此逻辑在小数据集上产生准确且可预测的结果.我的问题是当我的输入数据集有点大时,代码会挂起.我想确定这是因为我的同步块的上下文切换,以便我可以深入研究不同的设计.

This logic produces accurate and predictable result on small data set. My problem is when my input data set is a little large, the code hangs. I want to be sure that is because of the context switches for my synchronization block, so that I may dwelve into a different design.

推荐答案

onReceive()从不并发调用.这是 Akka 给你的最基本的保证.

onReceive() is never called concurrently. This is the most fundamental guarantee Akka is giving to you.

这意味着,如果您的 counter 变量是 actor 中的一个字段,并且没有其他代码可以直接访问该字段,那么您可以安全地使用普通的 int/long 而不是 AtomicInteger/AtomicLong.假设 finalResult 是一个封装并隐藏在 actor 中的字段,也不需要同步它.

This means that if your counter variable is a field in an actor and no other piece of code can access that field directly, you can safely use normal int/long instead of AtomicInteger/AtomicLong. Also synchronization on finalResult is not necessary assuming it is a field encapsulated and hidden in an actor.

最后,CountDownLatch 的用法是可疑的.在 Akka 应用程序中,您不应使用任何同步原语.Actors 本质上是单线程的,所有的通信(包括唤醒和传递数据)都应该通过消息传递来实现.

Finally the usage of CountDownLatch is suspicious. In Akka applications you shouldn't use any synchronizations primitives. Actors are essentially single-threaded and all communication (including waking up and passing data) should be implemented via message passing.

这在文档中都有解释:http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

这篇关于Akka onReceive 方法是否并发执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆