Python 多处理:全局对象未正确复制到子对象 [英] Python multiprocessing: Global objects not being copied to children properly

查看:40
本文介绍了Python 多处理:全局对象未正确复制到子对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

几天前,我回答了一个关于并行读取 tar 文件的问题.

A few days back I answered a question on SO regarding reading a tar file in parallel.

这是问题的要点:

import bz2
import tarfile
from multiprocessing import Pool

tr = tarfile.open('data.tar')

def clean_file(tar_file_entry):
    if '.bz2' not in str(tar_file_entry):
        return
    with tr.extractfile(tar_file_entry) as bz2_file:
        with bz2.open(bz2_file, "rt") as bzinput:
            # Reading bz2 file
            ....
            .... 


def process_serial():
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_serial() # No error
    process_parallel() # Error


if __name__ == '__main__':
    main()

中所述,我们只需在子进程中而不是在父进程中打开 tar 文件即可消除错误答案.

我不明白为什么这样做.

I am not able to understand why did this work.

即使我们在父进程中打开tarfile,子进程也会得到一个新的副本.那么为什么在子进程中打开 tarfile 会显式地产生任何区别?

Even if we open the tarfile in the parent process, the child process will get a new copy. So why does opening the tarfile in the child process explicitly make any difference?

这是否意味着在第一种情况下,子进程以某种方式改变了公共 tarfile 对象并由于并发写入而导致内存损坏?

Does this mean that in the first case, the child processes were somehow mutating the common tarfile object and causing memory corruption due to concurrent writes?

推荐答案

FWIW,评论中的答案 wrt open 实际上在类 UNIX 系统上关于文件句柄编号是不正确的.

FWIW, the answer in the comments wrt open is actually incorrect on UNIX-like systems regarding file handle numbers.

如果 multiprocessing 使用 fork()(它在 Linux 和类似的情况下使用,尽管我读到在 macOS 上存在分叉问题),文件句柄和所有内容else 被愉快地复制到子进程(愉快地"我的意思是它在许多边缘情况下很复杂,例如分叉线程,但它仍然适用于文件句柄).

If multiprocessing uses fork() (which it does under Linux and similar, although I read there was an issue with forking on macOS), the file handles and everything else are happily copied to child processes (by "happily" I mean it's complicated in many edge cases such as forking threads, but still it works fine for file handles).

以下对我来说很好用:

import multiprocessing

this = open(__file__, 'r')


def read_file():
    print(len(this.read()))


def main():
    process = multiprocessing.Process(target=read_file)
    process.start()
    process.join()


if __name__ == '__main__':
    main()

问题很可能是 tarfile 在读取时具有内部结构和/或缓冲,您也可以通过尝试同时查找和读取同一存档的不同部分来简单地遇到冲突.即,我推测在这种情况下使用没有任何同步的线程池可能会遇到完全相同的问题.

The problem is likely that tarfile has an internal structure and/or buffering while reading, also you can simply run into conflicts by trying to seek and read different parts of the same archive simultaneously. I.e., I'm speculating that using a threadpool without any synchronization is likely to run into exactly the same issues in this case.

编辑:澄清一下,从 Tar 存档中提取文件很可能(我还没有检查确切的细节)如下:(1)寻求封装部分(文件)的偏移量,(2)读取封装文件的一个块,将块写入目标文件(或管道,或w/e),(3)重复(2)直到整个文件被提取.

Edit: to clarify, extracting a file from a Tar archive is likely (I haven't checked the exact details) done as follows: (1) seek to the offset of the encapsulated part (file), (2) read a chunk of the encapsulated file, write the chunk to the destination file (or pipe, or w/e), (3) repeat (2) until the whole file is extracted.

通过尝试从使用相同文件句柄的并行进程以非同步方式执行此操作,可能会导致这些步骤的混合,即开始处理文件 #2 将远离文件 #1,而我们正在在读取文件#1 的中间,等等

By attempting to do this in a non-synchronized way from parallel processes using the same file handle, will likely result in mixing of these steps, i.e. starting to process file #2 will seek away from file #1, while we are in the middle of reading file #1, etc.

Edit2 回答下面的评论:内存表示为子进程重新分叉,这是真的;但是在内核端管理的资源(例如文件句柄和内核缓冲区)是共享的.

Edit2 answering the comment below: Memory representation is forked afresh for child processes, that's true; but resources managed on the kernel side (such as file handles, and kernel buffers) are shared.

举例说明:

import multiprocessing

this = open(__file__, 'rb')


def read_file(worker):
    print(worker, this.read(80))


def main():
    processes = []

    for number in (1, 2):
        processes.append(
            multiprocessing.Process(target=read_file, args=(number,)))

    for process in processes:
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

在 Linux 上运行我得到:

Running this on Linux I get:

$ python3.8 test.py 
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n   "
2 b''

如果查找和读取是独立的,则两个进程都会打印出相同的结果,但它们不会.由于这是一个小文件,并且 Python 选择缓冲少量数据 (8 KiB),第一个进程读取到 EOF,第二个进程没有数据可供读取(除非它当然会寻找回来).

If seeking and reading were independent, both processes would print an identical result, but they don't. Since this is a small file, and Python opts to buffer a small amount of data (8 KiB), the first process reads to the EOF, and the second process has no data left to read (unless it of course seeks back).

这篇关于Python 多处理:全局对象未正确复制到子对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆