Python多处理和共享计数器 [英] Python multiprocessing and a shared counter

查看:63
本文介绍了Python多处理和共享计数器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在多处理模块上遇到了麻烦.我正在使用具有其map方法的工作人员池从大量文件中加载数据,并且对于每个文件,我都使用自定义函数来分析数据.每次处理文件时,我都希望更新一个计数器,以便可以跟踪还有多少文件需要处理. 这是示例代码:

I'm having troubles with the multiprocessing module. I'm using a Pool of workers with its map method to load data from lots of files and for each of them I analyze data with with a custom function. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

我找不到解决方案.

推荐答案

问题在于,您的进程之间没有共享counter变量:每个单独的进程都在创建它自己的本地实例并对其进行递增.

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

有关某些内容,请参见文档的本节您可以用来在进程之间共享状态的技术.在您的情况下,您可能希望在您之间共享一个 Value 实例工人

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

这是您的示例的有效版本(带有一些虚拟输入数据).请注意,它使用的是全局值,在实践中我会尽量避免使用该值:

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()

这篇关于Python多处理和共享计数器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆