在 Python 3.4 中使用多处理时出现断言错误 [英] Assertion Error when using multiprocessing in Python 3.4

查看:109
本文介绍了在 Python 3.4 中使用多处理时出现断言错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Python 非常陌生,对并行处理完全陌生.

I'm pretty new to Python and completely new to parallel processing.

我一直在编写代码来分析点状图像数据(想想 PALM lite) 并尝试使用 multiprocessing 模块加速我的分析代码.

I've been writing code to analyze punctate image data (think PALM lite) and trying to speed up my analysis code using the multiprocessing module.

对于小数据集,我看到最多四个核心的相当不错的加速.对于大型数据集,我开始收到 AssertionError.我试图制作一个产生相同错误的简化示例,请参见下文:

For small data sets I see a pretty decent speed-up up to four cores. For large datasets I start getting an AssertionError. I tried to make a boiled down example which produces the same error, see below:

import numpy as np
import multiprocessing as mp
import os

class TestClass(object):
    def __init__(self, data):
        super().__init__()
        self.data = data

    def top_level_function(self, nproc = 1):

        if nproc > os.cpu_count():
            nproc = os.cpu_count()

        if nproc == 1:
            sums = [self._sub_function() for i in range(10)]
        elif 1 < nproc:
            print('multiprocessing engaged with {} cores'.format(nproc))
            with mp.Pool(nproc) as p:
                sums = [p.apply_async(self._sub_function) for i in range(10)]
                sums = [pp.get() for pp in sums]

        self.sums = sums

        return sums

    def _sub_function(self):
        return self.data.sum(0)


if __name__ == "__main__":
    t = TestClass(np.zeros((126,512,512)))
    ans = t.top_level_function()
    print(len(ans))
    ans = t.top_level_function(4)
    print(len(ans))

    t = TestClass(np.zeros((126,2048,2048)))
    ans = t.top_level_function()
    print(len(ans))
    ans = t.top_level_function(4)
    print(len(ans))

输出:

10
multiprocessing engaged with 4 cores
10
10
multiprocessing engaged with 4 cores
Process SpawnPoolWorker-6:
Traceback (most recent call last):
  File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
    self.run()
  File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
    res = self._reader.recv_bytes()
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError
Process SpawnPoolWorker-8:
Traceback (most recent call last):
  File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
    self.run()
  File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
    res = self._reader.recv_bytes()
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError
Traceback (most recent call last):
  File "test.py", line 41, in <module>
    ans = t.top_level_function(4)
  File "test.py", line 21, in top_level_function
    sums = [pp.get() for pp in sums]
  File "test.py", line 21, in <listcomp>
    sums = [pp.get() for pp in sums]
  File "C:\Anaconda3\lib\multiprocessing\pool.py", line 599, in get
    raise self._value
  File "C:\Anaconda3\lib\multiprocessing\pool.py", line 383, in _handle_tasks
    put(task)
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "C:\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
OSError: [WinError 87] The parameter is incorrect

所以第一个例子运行良好,但后面的例子(更大的数据集)崩溃了.

So the first example runs fine, but the later example (larger set of data) crashes.

我对这个错误的来源以及如何修复它感到非常迷茫.任何帮助将不胜感激.

I'm pretty lost about where this error is coming from and how to fix it. Any help would be greatly appreciated.

推荐答案

什么时候做

sums = [p.apply_async(self._sub_function) for i in range(10)]

结果是 self._sub_function 将被pickle 10 次并发送到工作进程进行处理.要对实例方法进行pickle,必须对整个实例(包括data 属性)进行pickle.快速检查表明,np.zeros((126,2048,2048)) 需要 4227858596 字节,而您要向 10 个不同的进程发送 10 倍的数据.

what happens is that self._sub_function will be pickled 10 times and sent to a worker process to be processed. To pickle an instance method, the whole instance (including the data attribute) has to be pickled. A quick check shows that np.zeros((126,2048,2048)) when pickled requires 4227858596 bytes, and you're sending 10 times that, to 10 different processes.

您在 _send_bytes 期间收到错误,这意味着到工作进程的传输被中断,我猜是因为您达到了内存限制.

You're getting a error during _send_bytes, which means the transfer to the worker process was interrupted, my guess would be because you're hitting your memory limit.

您可能应该重新考虑您的设计,如果每个工作人员都可以在不需要访问整个数据的情况下处理部分问题,那么多处理通常效果最好.

You should probably rethink your design, multiprocessing usually works best if each worker can work on part of the problem without needing access to the whole data.

这篇关于在 Python 3.4 中使用多处理时出现断言错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆