无法将函数并行映射到 tarfile 成员 [英] Can't map a function to tarfile members in parallel

查看:69
本文介绍了无法将函数并行映射到 tarfile 成员的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 bz2 压缩文件的 tarfile.我想将函数 clean_file 应用于每个 bz2 文件,并整理结果.在系列中,这很容易使用循环:

I have a tarfile containing bz2-compressed files. I want to apply the function clean_file to each of the bz2 files, and collate the results. In series, this is easy with a loop:

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool

def clean_file(member):
    if '.bz2' in str(member):

        f = tr.extractfile(member)

        with bz2.open(f, "rt") as bzinput:
            dicts = []
            for i, line in enumerate(bzinput):
                line = line.replace('"name"}', '"name":" "}')
                dat = json.loads(line)
                dicts.append(dat)

        bzinput.close()
        f.close()
        del f, bzinput

        processed = dicts[0]
        return processed

    else:
        pass


# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)


# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
    processed_files.append(clean_file(m))
    i+=1
    print('done '+str(i)+'/'+str(num_files))
    

但是,我需要能够并行执行此操作.我正在尝试的方法使用 Pool 像这样:

However, I need to be able to do this in parallel. The method I'm trying uses Pool like so:

# Apply the clean_file function in parallel
if __name__ == '__main__':
   with Pool(2) as p:
      processed_files = list(p.map(clean_file, members))

但这会返回一个 OSError:

But this returns an OSError:

Traceback (most recent call last):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "parse_data.py", line 19, in clean_file
    for i, line in enumerate(bzinput):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
    return self._buffer.read1(size)
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
    data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
    for obj in iterable:
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
OSError: Invalid data stream

所以我想这种方式不能正确访问 data.tar 或其他内容中的文件.如何并行应用该函数?

So I guess this way isn't properly accessing the files from within data.tar or something. How can I apply the function in parallel?

我猜这将适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据:https://github.com/johnf1004/reproduce_tar_error

I'm guessing this will work with any tar archive containing bz2 files but here's my data to reproduce the error: https://github.com/johnf1004/reproduce_tar_error

推荐答案

似乎发生了某种竞争条件.在每个子进程中单独打开 tar 文件可以解决问题:

It seems some race condition was happening. Opening the tar file separately in every child process solves the issue:

import json
import bz2
import tarfile
import logging
from multiprocessing import Pool


def clean_file(member):
    if '.bz2' not in str(member):
        return
    try:
        with tarfile.open('data.tar') as tr:
            with tr.extractfile(member) as bz2_file:
                with bz2.open(bz2_file, "rt") as bzinput:
                    dicts = []
                    for i, line in enumerate(bzinput):
                        line = line.replace('"name"}', '"name":" "}')
                        dat = json.loads(line)
                        dicts.append(dat)
                        return dicts[0]
    except Exception:
        logging.exception(f"Error while processing {member}")


def process_serial():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_parallel()


if __name__ == '__main__':
    main()

请注意,解决此问题的另一种方法是使用 spawn start 方法:

Note that another way to solve this problem is to just use the spawn start method:

multiprocessing.set_start_method('spawn')

通过这样做,我们正在指示 Python 深度复制"子进程中的文件句柄.在默认的fork"下start 方法,父子文件句柄共享相同的偏移量.

By doing this, we are instructing Python to "deep-copy" file handles in child processes. Under the default "fork" start method, the file handles of parent and child share the same offsets.

这篇关于无法将函数并行映射到 tarfile 成员的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆