读,压缩,写与多处理 [英] Read, compress, write with multiprocessing

查看:63
本文介绍了读,压缩,写与多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在压缩文件.一个过程对其中的几个就可以了,但是我要压缩成千上万个过程,这可能(而且已经)花了几天的时间,所以我想通过多处理来加快速度.我已经阅读,我应该避免让多个进程同时读取文件,而且我猜我应该没有多个过程也同时写入.这是我目前单独运行的方法:

I'm compressing files. A single process is fine for a few of them, but I'm compressing thousands of them and this can (and has) taken several days, so I’d like to speed it up with multiprocessing. I’ve read that I should avoid having multiple processes reading files at the same time, and I’m guessing I shouldn’t have multiple processes writing at once as well. This is my current method that runs singly:

import tarfile, bz2, os
def compress(folder):
    "compresses a folder into a file"

    bz_file = bz2.BZ2File(folder+'.tbz', 'w')

    with tarfile.open(mode='w', fileobj = bz_file) as tar:

        for fn in os.listdir(folder):

            read each file in the folder and do some pre processing
            that will make the compressed file much smaller than without

            tar.addfile( processed file )

    bz_file.close()
    return

这是一个文件夹,并将其所有内容压缩到一个文件中.这使它们更易于处理和更有条理.如果我只是将其扔进一个池中,那么我将有多个同时读取和写入的进程,因此我想避免这种情况.我可以对其进行重做,因此只有一个进程正在读取文件,但是我仍然有多个进程正在写:

This is taking a folder and compressing all its contents into a single file. This makes them easier to handle and more organized. If I just tossed this into a pool, then I’d have several processes reading and writing all at once, so I want to avoid that. I can rework it so only one process is reading the files but I still have multiple ones writing:

import multiprocessing as mp
import tarfile, bz2, os

def compress(file_list):
    folder = file_list[0]
    bz_file = bz2.BZ2File(folder+'.tbz', 'w')

    with tarfile.open(mode='w', fileobj = bz_file) as tar:

        for i in file_list[1:]:
            preprocess file data
            tar.addfile(processed data)

    bz_file.close()
    return

cpu_count = mp.cpu_count()
p = mp.Pool(cpu_count)

for subfolder in os.listdir(main_folder):

    read all files in subfolder into memory, place into file_list
    place file_list into fld_list until fld_list contains cpu_count
    file lists. then pass to  p.map(compress, fld_list)

这仍然有许多同时写入压缩文件的进程.告诉tarfile使用哪种压缩的行为就开始写入硬盘驱动器.我无法读取需要压缩到内存中的所有文件,因为我没有足够的RAM来执行操作–因此,还有一个问题,就是我需要多次重新启动Pool.map.

This still has a number of processes writing compressed files at once. Just the act of telling tarfile what kind of compression to use starts writing to the hard drive. I cannot read all the files I need to compress into memory as I don’t have that amount of RAM to do so – so it also has the issue that I’m restarting Pool.map many times.

我如何在单个进程中读写文件,但在多个进程中具有所有压缩功能,同时又避免了重新启动多重处理.因此,请池化多次?

How can I read and write files in a single process, yet have all the compression in several processes, while avoiding restarting multiprocessing.Pool multiple times?

推荐答案

而不是使用multiprocessing.Pool,而应该使用multiprocessing.Queue并创建一个收件箱和一个发件箱.

Instead of using multiprocessing.Pool, one should use multiprocessing.Queue and create an inbox and an outbox.

启动单个过程以读取文件并将数据放入收件箱队列中,并对队列的大小进行限制,以免最终不填满RAM.这里的示例压缩单个文件,但可以对其进行调整以一次处理整个文件夹.

Start a single process to read in the files and place the data into the inbox queue, and put a limit on the size of the queue so you don't end up filling your RAM. The example here compresses single files, but it can be adjusted to handle whole folders at once.

def reader(inbox, input_path, num_procs):
    "process that reads in files to be compressed and puts to inbox"

    for fn in os.listdir(input_path):
        path = os.path.join(input_path, fn)

        # read in each file, put data into inbox
        fname = os.path.basename(fn)
        with open(fn, 'r') as src: lines = src.readlines()

        data = [fname, lines]
        inbox.put(data)

    # read in everything, add finished notice for all running processes
    for i in range(num_procs):
        inbox.put(None)  # when a compressor sees a None, it will stop
    inbox.close()
    return

但这只是问题的一半,另一部分是压缩文件而不必将其写入磁盘.我们给压缩函数一个StringIO对象,而不是一个打开的文件.它传递给tarfile.压缩后,我们将StringIO对象放入发件箱队列.

But that's only half of the question, the other part is to compress the file without having to write it to disk. We give a StringIO object to the compression function instead of an open file; it is passed to tarfile. Once compressed we put the StringIO object into the outbox queue.

除了我们不能这样做,因为不能腌制StringIO对象,只有可钻探对象可以进入队列.但是,StringIO的getvalue函数可以以可选格式提供内容,因此请使用getvalue来获取内容,关闭StringIO对象,然后将其放入发件箱中.

Except we can't do that, because StringIO objects can't be pickled, only pickleable objects can go into a queue. However, the getvalue function of StringIO can give the contents in a pickable format, so grab the contents with getvalue, close the StringIO object and then put the contents into the outbox.

from io import StringIO
import tarfile

def compressHandler(inbox, outbox):
    "process that pulls from inbox, compresses and puts to outbox"
    supplier = iter(inbox.get, None)  # stops when gets a None
    while True:
        try:
            data = next(supplier)  # grab data from inbox
            pressed = compress(data)  # compress it
            ou_que.put(pressed)  # put into outbox
        except StopIteration:
            outbox.put(None)  # finished compressing, inform the writer
            return  # and quit

def compress(data):
    "compress file"
    bz_file = StringIO()

    fname, lines = dat  # see reader def for package order

    with tarfile.open(mode='w:bz2', fileobj=bz_file) as tar:

        info = tarfile.TarInfo(fname)  # store file name
        tar.addfile(info, StringIO(''.join(lines)))  # compress

    data = bz_file.getvalue()
    bz_file.close()
    return data

然后,编写器进程从发件箱队列中提取内容并将其写入磁盘.此功能将需要知道启动了多少个压缩进程,因此只有在得知每个进程都已停止时才知道停止压缩.

The writer process then extracts the contents from the outbox queue and writes them to disk. This function will need to know how many compression processes were started so it knows only to stop when it has heard that every process has stopped.

def writer(outbox, output_path, num_procs):
    "single process that writes compressed files to disk"
    num_fin = 0

    while True:
        # all compression processes have finished
        if num_finished >= num_procs: break

        tardata = outbox.get()

        # a compression process has finished
        if tardata == None:
            num_fin += 1
            continue

        fn, data = tardata
        name = os.path.join(output_path, fn) + '.tbz'

        with open(name, 'wb') as dst: dst.write(data)
    return

最后,是将所有内容放在一起的设置

Finally, there's the set up to put them all together

import multiprocessing as mp
import os

def setup():
    fld = 'file/path'

    # multiprocess setup
    num_procs = mp.cpu_count()

    # inbox and outbox queues
    inbox = mp.Queue(4*num_procs)  # limit size 
    outbox = mp.Queue()

    # one process to read
    reader = mp.Process(target = reader, args = (inbox, fld, num_procs))
    reader.start()

    # n processes to compress
    compressors = [mp.Process(target = compressHandler, args = (inbox, outbox))
                   for i in range(num_procs)]
    for c in compressors: c.start()

    # one process to write
    writer = mp.Process(target = writer, args=(outbox, fld, num_procs))
    writer.start()
    writer.join()  # wait for it to finish
    print('done!')

这篇关于读,压缩,写与多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆