Python 多处理和共享计数器 [英] Python multiprocessing and a shared counter

查看:26
本文介绍了Python 多处理和共享计数器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用多处理模块时遇到了问题.我正在使用具有 map 方法的工作人员池从大量文件中加载数据,并为每个文件使用自定义函数分析数据.每次处理一个文件时,我都希望更新一个计数器,以便我可以跟踪还有多少文件需要处理.这是示例代码:

I'm having troubles with the multiprocessing module. I'm using a Pool of workers with its map method to load data from lots of files and for each of them I analyze data with with a custom function. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

我找不到解决办法.

推荐答案

问题是 counter 变量在你的进程之间没有共享:每个单独的进程都在创建它自己的本地实例并递增它.

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

请参阅文档的本节了解一些您可以用来在进程之间共享状态的技术.在您的情况下,您可能希望共享 Value 你的工人之间的实例

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

这是您示例的工作版本(带有一些虚拟输入数据).请注意,它使用了我在实践中会尽量避免的全局值:

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()

这篇关于Python 多处理和共享计数器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆