尝试访问多处理中的持久性数据时获得不稳定的运行时异常. [英] Getting erratic runtime exceptions trying to access persistant data in multiprocessing.Pool worker processes

查看:134
本文介绍了尝试访问多处理中的持久性数据时获得不稳定的运行时异常.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

受到此解决方案的启发,我正在尝试在Python中建立工作进程的多处理池.这个想法是在工作程序实际开始工作之前将一些数据传递给工作程序,并最终重新使用它们.旨在最大程度减少每次调用工作进程所需的打包数据量(即,减少进程间通信开销).我的 MCVE 看起来像这样:

Inspired by this solution I am trying to set up a multiprocessing pool of worker processes in Python. The idea is to pass some data to the worker processes before they actually start their work and reuse it eventually. It's intended to minimize the amount of data which needs to be packed/unpacked for every call into a worker process (i.e. reducing inter-process communication overhead). My MCVE looks like this:

import multiprocessing as mp
import numpy as np

def create_worker_context():
    global context # create "global" context in worker process
    context = {}

def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
    context.update({
        'worker_id': worker_id,
        'some_const_array': some_const_array,
        'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
        }) # store context information in global namespace of worker
    return True # return True, verifying that the worker process received its data

class data_analysis:
    def __init__(self):
        self.DTYPE = 'float32'
        self.CPU_LEN = mp.cpu_count()
        self.DIMS = 100
        self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
        # Init multiprocessing pool
        self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
        pool_results = [
            self.cpu_pool.apply_async(
                init_worker_context,
                args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
            ) for core_id in range(self.CPU_LEN)
            ] # pass information to workers' context
        result_batches = [result.get() for result in pool_results] # check if they got the information
        if not all(result_batches): # raise an error if things did not work
            raise SyntaxError('Workers could not be initialized ...')

    @staticmethod
    def process_batch(batch_data):
        context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
        return context['tmp'] # return result

    def process_all(self):
        input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
        pool_results = [
            self.cpu_pool.apply_async(
                data_analysis.process_batch,
                args = (input_data,)
            ) for _ in range(self.CPU_LEN)
            ] # let workers actually work
        result_batches = [result.get() for result in pool_results]
        for batch in result_batches[1:]:
            np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
        print(result_batches[0]) # show result

if __name__ == '__main__':
    data_analysis().process_all()

我正在CPython 3.6.6上运行以上代码.

I am running the above with CPython 3.6.6.

奇怪的是……有时它起作用,有时却不起作用.如果它不起作用,则process_batch方法将引发异常,因为它无法在context中找到some_const_array作为键.完整的回溯看起来像这样:

The strange thing is ... sometimes it works, sometimes it does not. If it does not work, the process_batch method throws an exception, because it can not find some_const_array as a key in context. The full traceback looks like this:

(env) me@box:/path> python so.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/path/so.py", line 37, in process_batch
    context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/so.py", line 54, in <module>
    data_analysis().process_all()
  File "/path/so.py", line 48, in process_all
    result_batches = [result.get() for result in pool_results]
  File "/path/so.py", line 48, in <listcomp>
    result_batches = [result.get() for result in pool_results]
  File "/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
KeyError: 'some_const_array'

我很困惑.这里发生了什么?

I am puzzled. What is going on here?

如果我的context词典包含较高类型"的对象,例如数据库驱动程序或类似驱动程序,我没有遇到这种问题.如果我的context字典包含基本的Python数据类型,集合或numpy数组,则只能重现此内容.

If my context dictionaries contain an object of "higher type", e.g. a database driver or similar, I am not getting this kind of problem. I can only reproduce this if my context dictionaries contain basic Python data types, collections or numpy arrays.

(是否有可能以更可靠的方式实现相同目标的更好方法?我知道我的方法被认为是

(Is there a potentially better approach for achieving the same thing in a more reliable manner? I know my approach is considered a hack ...)

推荐答案

您需要将init_worker_context的内容重新定位到initializer函数create_worker_context中.

You need to relocate the content of init_worker_context into your initializer function create_worker_context.

您认为每一个工作者进程将运行init_worker_context的假设是造成您困惑的原因. 您提交给池的任务将被馈送到所有读取的工作进程的一个内部任务队列中.在您的情况下发生的事情是,某些工作进程完成了他们的任务,并再次竞争以获取新任务.这样一来,一个工作进程可能会执行多个任务,而另一个工作进程却无法完成一个任务,这很可能发生.请记住,操作系统为(工作进程的)线程调度运行时.

Your assumption that every single worker process will run init_worker_context is responsible for your confusion. The tasks you submit to a pool get fed into one internal taskqueue all worker processes read from. What happens in your case is, that some worker processes complete their task and compete again for getting new tasks. So it can happen that one worker processes will execute multiple tasks while another one will not get a single one. Keep in mind the OS schedules runtime for threads (of the worker processes).

这篇关于尝试访问多处理中的持久性数据时获得不稳定的运行时异常.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆