在多处理python中共享pandas数据框字典 [英] Share a dictionary of pandas dataframe across multiprocessing python

查看:128
本文介绍了在多处理python中共享pandas数据框字典的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python pandas数据帧的字典.该词典的总大小约为2GB.但是,当我在16个多进程中共享它时(在子进程中,我只读取dict的数据而不修改它),它需要32GB的内存.因此,我想问一问我是否可以在多处理过程中共享该词典而不进行复制.我试图将其转换为manager.dict().但是似乎需要太长时间.实现这一目标的最标准方法是什么?谢谢.

I have a dictionary of python pandas dataframes. The total size of this dictionary is about 2GB. However, when I share it across 16 multiprocessing (in the subprocesses I only read the data of the dict without modifying it), it takes 32GB ram. So I would like to ask if it is possible for me to share this dictionary across multiprocessing without copying it. I tried to convert it to manager.dict(). But it seems it takes too long. What would be the most standard way to achieve this? Thank you.

推荐答案

我发现的最佳解决方案(仅适用于某些类型的问题)是通过Python的BaseManager和SyncManager类使用客户端/服务器设置.为此,您首先要设置一台服务器,该服务器充当数据的代理类.

The best solution I've found (and it only works for some types of problems) is to use a client/server setup using Python's BaseManager and SyncManager classes. To do this you first setup a Server that serve's up a proxy class for the data.

DataServer.py

#!/usr/bin/python
from    multiprocessing.managers import SyncManager
import  numpy

# Global for storing the data to be served
gData = {}

# Proxy class to be shared with different processes
# Don't put big data in here since that will force it to be piped to the
# other process when instantiated there, instead just return a portion of
# the global data when requested.
class DataProxy(object):
    def __init__(self):
        pass

    def getData(self, key, default=None):
        global gData
        return gData.get(key, None)

if __name__ == '__main__':
    port  = 5000

    print 'Simulate loading some data'
    for i in xrange(1000):
        gData[i] = numpy.random.rand(1000)

    # Start the server on address(host,port)
    print 'Serving data. Press <ctrl>-c to stop.'
    class myManager(SyncManager): pass
    myManager.register('DataProxy', DataProxy)
    mgr = myManager(address=('', port), authkey='DataProxy01')
    server = mgr.get_server()
    server.serve_forever()

运行以上命令一次,然后继续运行.下面是用于访问数据的客户端类.

Run the above once and leave it running. Below is the client class you use to access the data.

DataClient.py

from   multiprocessing.managers import BaseManager
import psutil   #3rd party module for process info (not strictly required)

# Grab the shared proxy class.  All methods in that class will be availble here
class DataClient(object):
    def __init__(self, port):
        assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
        class myManager(BaseManager): pass
        myManager.register('DataProxy')
        self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
        self.mgr.connect()
        self.proxy = self.mgr.DataProxy()

    # Verify the server is running (not required)
    @staticmethod
    def _checkForProcess(name):
        for proc in psutil.process_iter():
            if proc.name() == name:
                return True
        return False

下面是用于多处理的测试代码.

Below is the test code to try this with multiprocessing.

TestMP.py

#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from   DataClient import *    

# Confusing, but the "proxy" will be global to each subprocess, 
# it's not shared across all processes.
gProxy = None
gMode  = None
gDummy = None
def init(port, mode):
    global gProxy, gMode, gDummy
    gProxy  = DataClient(port).proxy
    gMode  = mode
    gDummy = numpy.random.rand(1000)  # Same as the dummy in the server
    #print 'Init proxy ', id(gProxy), 'in ', mp.current_process()

def worker(key):
    global gProxy, gMode, gDummy
    if 0 == gMode:   # get from proxy
        array = gProxy.getData(key)
    elif 1 == gMode: # bypass retrieve to test difference
        array = gDummy
    else: assert 0, 'unknown mode: %s' % gMode
    for i in range(1000):
        x = sum(array)
    return x    

if __name__ == '__main__':
    port   = 5000
    maxkey = 1000
    numpts = 100

    for mode in [1, 0]:
        for nprocs in [16, 1]:
            if 0==mode: print 'Using client/server and %d processes' % nprocs
            if 1==mode: print 'Using local data and %d processes' % nprocs                
            keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
            pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
            start = time.time()
            ret_data = pool.map(worker, keys, chunksize=1)
            print '   took %4.3f seconds' % (time.time()-start)
            pool.close()

当我在机器上运行它时,我得到...

When I run this on my machine I get...

Using local data and 16 processes
   took 0.695 seconds
Using local data and 1 processes
   took 5.849 seconds
Using client/server and 16 processes
   took 0.811 seconds
Using client/server and 1 processes
   took 5.956 seconds

这在您的多处理系统中是否对您有用,取决于获取数据的频率.每次传输都有少量的开销.如果您拒绝x=sum(array)循环中的迭代次数,则可以看到此信息.在某些时候,您将花费更多的时间来获取数据,而不是处理数据.

Whether this works for you in your multiprocessing system depends on how often have to grab the data. There's a small overhead associated with each transfer. You can see this if you turn down the number of iterations in the x=sum(array) loop. At some point you'll spend more time getting data than working on it.

除了多处理之外,我也喜欢这种模式,因为我只需要在服务器程序中加载一次大数组数据即可,并且在我杀死服务器之前一直保持加载状态.这意味着我可以对数据运行一堆单独的脚本,它们可以快速执行.无需等待数据加载.

Besides multiprocessing, I also like this pattern because I only have to load my big array data once in the server program and it stays loaded until I kill the server. That means I can run a bunch of separate scripts against the data and they execute quickly; no waiting for data to load.

虽然这里的方法有点类似于使用数据库,但是它的优点是可以处理任何类型的python对象,而不仅仅是简单的字符串和整数DB表等.我发现使用DB是一种对于那些简单的类型来说,速度要快一些,但是对我来说,它往往需要更多的编程工作,而且我的数据并不总是很容易地移植到数据库中.

While the approach here is somewhat similar to using a database, it has the advantage of working on any type of python object, not just simple DB tables of strings and ints, etc. I've found that using a DB is a bit faster for those simple types but for me, it tends to be more work programatically and my data doesn't always port over easily to a database.

这篇关于在多处理python中共享pandas数据框字典的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆