如何计算分布式数据的均值? [英] How to calculate mean of distributed data?

查看:251
本文介绍了如何计算分布式数据的均值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在分布式计算中,如何将数据划分到多个节点上,如何计算大向量(系列)的算术平均值.我不想使用map reduce范式.除了琐碎计算每个节点上的单个和,然后将结果带到主节点并除以vector(series)的大小以外,是否有任何分布式算法可以有效地计算均值.

How I can calculate the arithmetic mean of a large vector(series) in distributed computing where I partition the data on multiple nodes. I do not want to use map reduce paradigm. Is there any distributed algorithm to efficiently compute the mean besides the trivial computation of individual sum on each node and then bringing the result at master node and dividing with the size of the vector(series).

推荐答案

分布式平均共识是一种替代方法.

distributed average consensus is an alternative.

使用master进行map-reduce的简单方法的问题是,如果您拥有大量数据,从本质上讲,要使所有内容相互依赖,则可能需要很长时间才能计算出数据.除非您锁定整个数据集,否则这一次的信息将非常过时,因此是错误的-对于大量的分布式数据而言,这是不切实际的.使用分布式平均共识(相同的方法适用于均值的替代算法),您可以获取最新的信息,更好地猜测均值的当前值,而无需锁定数据,并且是实时的. 这是论文的链接,但是数学上很繁琐: http://web.stanford.edu/~boyd/papers/pdf/lms_consensus.pdf 您可以在Google上搜索许多论文.

The problem with the trivial approach of map-reduce with a master is that if you have a vast set of data, in essence to make everything dependent on each other, it could take a very long time to calculate the data, by which time the information is very out of date, and therefore wrong, unless you lock the entire dataset - impractical for a massive set of distributed data. Using distributed average consensus (the same methods work for alternative algorithms to Mean), you get a more up to date, better guess at the current value of the Mean without locking the data, and in real time. Here is a link to a paper on it, but it's math heavy : http://web.stanford.edu/~boyd/papers/pdf/lms_consensus.pdf You can google for many papers on it.

一般概念是这样的:说在每个节点上都有一个套接字侦听器.您评估本地总和和平均值,然后将其发布到其他节点.每个节点侦听其他节点,并在有意义的时间范围内接收它们的总和和平均值.然后,您可以通过(sumForAllNodes(storedAverage [node] * storedCount [node])/(sumForAllNodes(storedCount [node]))在总平均值上评估一个不错的猜测.存储在节点中的值,然后修改本地计数和平均值,然后发布它们.

The general concept is like this: say on each node you have a socket listener. You evaluate your local sum and average, then publish it to the other nodes. Each node listens for the other nodes, and receives their sum and averages on a timescale that makes sense. You can then evaluate a good guess at the total average by (sumForAllNodes(storedAverage[node] * storedCount[node]) / (sumForAllNodes(storedCount[node])). If you have a truly large dataset, you could just listen for new values as they are stored in the node, and amend the local count and average, then publish them.

即使花费的时间太长,您也可以平均每个节点中随机的数据子集.

If even this is taking too long, you could average over a random subset of the data in each node.

这是一些c#代码,可为您提供一个想法(使用fleck可以在比Windows-10-only microsoft websockets实现更多的Windows版本上运行).在两个节点上运行此命令,一个使用

Here is some c# code that gives you an idea (uses fleck to run on more versions of windows than windows-10-only microsoft websockets implementation). Run this on two nodes, one with

<appSettings>
    <add key="thisNodeName" value="UK" />
</appSettings>

在app.config中

,在另一个中使用"EU-North".这是一些示例代码.这两个实例使用websockets进行交换.您只需要添加数据库的后端枚举即可.

in the app.config, and use "EU-North" in the other. Here is some sample code. The two instances exchange means using websockets. You just need to add your back end enumeration of the database.

using Fleck;

namespace WebSocketServer
{
    class Program
    {
        static List<IWebSocketConnection> _allSockets;
        static Dictionary<string,decimal> _allMeans;
        static Dictionary<string,decimal> _allCounts;
        private static decimal _localMean;
        private static decimal _localCount;
        private static decimal _localAggregate_count;
        private static decimal _localAggregate_average;

        static void Main(string[] args)
        {
            _allSockets = new List<IWebSocketConnection>();
            _allMeans = new Dictionary<string, decimal>();
            _allCounts = new Dictionary<string, decimal>();

            var serverAddresses = new Dictionary<string,string>();
            //serverAddresses.Add("USA-WestCoast", "ws://127.0.0.1:58951");
            //serverAddresses.Add("USA-EastCoast", "ws://127.0.0.1:58952");
            serverAddresses.Add("UK", "ws://127.0.0.1:58953");
            serverAddresses.Add("EU-North", "ws://127.0.0.1:58954");
            //serverAddresses.Add("EU-South", "ws://127.0.0.1:58955");
            foreach (var serverAddress in serverAddresses)
            {
                _allMeans.Add(serverAddress.Key, 0m);
                _allCounts.Add(serverAddress.Key, 0m);
            }

            var thisNodeName = ConfigurationSettings.AppSettings["thisNodeName"];   //for example "UK"
            var serverSocketAddress = serverAddresses.First(x=>x.Key==thisNodeName);
            serverAddresses.Remove(thisNodeName);

            var websocketServer = new Fleck.WebSocketServer(serverSocketAddress.Value);

            websocketServer.Start(socket =>
            {
                socket.OnOpen = () =>
                {
                    Console.WriteLine("Open!");
                    _allSockets.Add(socket);
                };
                socket.OnClose = () =>
                {
                    Console.WriteLine("Close!");
                    _allSockets.Remove(socket);
                };
                socket.OnMessage = message =>
                {
                    Console.WriteLine(message + " received");

                    var parameters = message.Split('~');
                    var remoteHost = parameters[0];
                    var remoteMean = decimal.Parse(parameters[1]);
                    var remoteCount = decimal.Parse(parameters[2]);
                    _allMeans[remoteHost] = remoteMean;
                    _allCounts[remoteHost] = remoteCount;


                };
            });
            while (true)
            {
                //evaluate my local average and count
                Random rand = new Random(DateTime.Now.Millisecond);
                _localMean = 234.00m + (rand.Next(0, 100) - 50)/10.0m;
                _localCount = 222m + rand.Next(0, 100);

                //evaluate my local aggregate average using means and counts sent from all other nodes
                //could publish aggregate averages to other nodes, if you wanted to monitor disagreement between nodes
                var total_mean_times_count = 0m;
                var total_count = 0m;
                foreach (var server in serverAddresses)
                {
                    total_mean_times_count += _allCounts[server.Key]*_allMeans[server.Key];
                    total_count += _allCounts[server.Key];
                }
                //add on local mean and count which were removed from the server list earlier, so won't be processed
                total_mean_times_count += (_localMean * _localCount);
                total_count = total_count + _localCount;

                _localAggregate_average = (total_mean_times_count/total_count);
                _localAggregate_count = total_count;

                Console.WriteLine("local aggregate average = {0}", _localAggregate_average);

                System.Threading.Thread.Sleep(10000);
                foreach (var serverAddress in serverAddresses)
                {
                    using (var wscli = new ClientWebSocket())
                    {
                        var tokSrc = new CancellationTokenSource();
                        using (var task = wscli.ConnectAsync(new Uri(serverAddress.Value), tokSrc.Token))
                        {
                            task.Wait();
                        }

                        using (var task = wscli.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(thisNodeName+"~"+_localMean + "~"+_localCount)),
                            WebSocketMessageType.Text,
                            false,
                            tokSrc.Token
                            ))
                        {
                            task.Wait();
                        }
                    }

                }
            }
        }



    }
}

请不要忘记通过在给定时间进行同步来添加静态锁或单独的活动. (为简单起见,未显示)

Don't forget to add static lock or separate activity by synchronising at given times. (not shown for simplicity)

这篇关于如何计算分布式数据的均值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆