与 multiprocessing.Pool 共享一个计数器 [英] Sharing a counter with multiprocessing.Pool

查看:21
本文介绍了与 multiprocessing.Pool 共享一个计数器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 multiprocessing.Value + multiprocessing.Lock 在不同的进程之间共享一个计数器.例如:

I'd like to use multiprocessing.Value + multiprocessing.Lock to share a counter between separate processes. For example:

import itertools as it
import multiprocessing

def func(x, val, lock):
    for i in range(x):
        i ** 2
    with lock:
        val.value += 1
        print('counter incremented to:', val.value)

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    with multiprocessing.Pool() as pool:
        pool.starmap(func, ((i, v, lock) for i in range(25)))
    print(counter.value())

这将引发以下异常:

RuntimeError: 同步对象只能在之间共享通过继承处理

RuntimeError: Synchronized objects should only be shared between processes through inheritance

我最困惑的是一个相关的(尽管不是完全相似的)模式与 multiprocessing.Process() 一起工作:

What I am most confused by is that a related (albeit not completely analogous) pattern works with multiprocessing.Process():

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    procs = [multiprocessing.Process(target=func, args=(i, v, lock))
             for i in range(25)]
    for p in procs: p.start()
    for p in procs: p.join()

现在,我认识到这是两个明显不同的东西:

Now, I recognize that these are two different markedly things:

  • 第一个示例使用数量等于 cpu_count() 的工作进程,并在它们之间拆分一个可迭代的 range(25)
  • 第二个示例创建了 25 个工作进程和任务,每个都有一个输入
  • the first example uses a number of worker processes equal to cpu_count(), and splits an iterable range(25) between them
  • the second example creates 25 worker processes and tasks each with one input

也就是说:如何以这种方式与 pool.starmap()(或 pool.map())共享实例?

That said: how can I share an instance with pool.starmap() (or pool.map()) in this manner?

我在这里看到了类似的问题,这里这里,但这些方法似乎不适合.map()/.starmap(),不管Value是否使用ctypes.c_int.

I've seen similar questions here, here, and here, but those approaches doesn't seem to be suited to .map()/.starmap(), regarldess of whether Value uses ctypes.c_int.

我意识到这种方法在技术上是可行的:

I realize that this approach technically works:

def func(x):
    for i in range(x):
        i ** 2
    with lock:
        v.value += 1
        print('counter incremented to:', v.value)

v = None
lock = None

def set_global_counter_and_lock():
    """Egh ... """
    global v, lock
    if not any((v, lock)):
        v = multiprocessing.Value('i', 0)
        lock = multiprocessing.Lock()

if __name__ == '__main__':
    # Each worker process will call `initializer()` when it starts.
    with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
        pool.map(func, range(25))

这真的是解决这个问题的最佳实践方式吗?

Is this really the best-practices way of going about this?

推荐答案

你在使用 Pool 时得到的 RuntimeError 是因为 pool-methods 的参数在发送之前被腌制了通过(池内部)队列到工作进程.您尝试使用哪种池方法在这里无关紧要.当您只使用 Process 时不会发生这种情况,因为不涉及队列.您可以使用 pickle.dumps(multiprocessing.Value('i', 0)) 重现错误.

The RuntimeError you get when using Pool is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes. Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just use Process because there is no queue involved. You can reproduce the error just with pickle.dumps(multiprocessing.Value('i', 0)).

您的最后一个代码段与您认为的不同.您不是在共享 Value,而是在为每个子进程重新创建独立的计数器.

Your last code snippet doesn't work how you think it works. You are not sharing a Value, you are recreating independent counters for every child process.

如果您在 Unix 上并使用默认的启动方法fork",则只需 将共享对象作为参数传递给池方法即可.您的子进程将通过分叉继承全局变量.使用 process-start-methods "spawn"(默认 Windows 和 macOS 和 Python 3.8+) 或forkserver",您必须在 Pool 期间使用 initializer实例化,让子进程继承共享对象.

In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods. Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the initializer during Pool instantiation, to let the child-processes inherit the shared objects.

注意,这里不需要额外的 multiprocessing.Lock,因为 multiprocessing.Value 默认带有一个您可以使用的内部锁.

Note, you don't need an extra multiprocessing.Lock here, because multiprocessing.Value comes by default with an internal one you can use.

import os
from multiprocessing import Pool, Value #, set_start_method


def func(x):
    for i in range(x):
        assert i == i
        with cnt.get_lock():
            cnt.value += 1
            print(f'{os.getpid()} | counter incremented to: {cnt.value}
')


def init_globals(counter):
    global cnt
    cnt = counter


if __name__ == '__main__':

    # set_start_method('spawn')

    cnt = Value('i', 0)
    iterable = [10000 for _ in range(10)]

    with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
        pool.map(func, iterable)

    assert cnt.value == 100000

同样值得注意的是,您不需要在所有情况下都共享计数器.如果您只需要跟踪某事总共发生的频率,一个选项是在计算过程中保留单独的工作本地计数器,最后进行汇总.对于在并行计算期间不需要同步的频繁计数器更新,这可能会显着提高性能.

Probably worth noting as well is that you don't need the counter to be shared in all cases. If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end. This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.

这篇关于与 multiprocessing.Pool 共享一个计数器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆