为什么multiprocessing.Lock()不锁定Python中的共享资源? [英] Why does multiprocessing.Lock() not lock shared resource in Python?

查看:124
本文介绍了为什么multiprocessing.Lock()不锁定Python中的共享资源?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个非常大的文本文件,其中包含许多我想撤消的行.而且我不在乎最后的命令.输入文件包含西里尔符号.我使用multiprocessing在多个内核上进行处理.

Supposing I have a very big text file consisting of many lines that I would like to reverse. And I don't care of the final order. The input file contains Cyrillic symbols. I use multiprocessing to process on several cores.

我写了这样的程序:

# task.py

import multiprocessing as mp


POOL_NUMBER = 2


lock_read = mp.Lock()
lock_write = mp.Lock()

fi = open('input.txt', 'r')
fo = open('output.txt', 'w')

def handle(line):
    # In the future I want to do
    # some more complicated operations over the line
    return line.strip()[::-1]  # Reversing

def target():
    while True:
        try:
            with lock_read:
                line = next(fi)
        except StopIteration:
            break

        line = handle(line)

        with lock_write:
            print(line, file=fo)

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
    p.start()
for p in pool:
    p.join()

fi.close()
fo.close()

该程序失败,并显示错误:

This program fails with error:

Process Process-2:
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte

另一方面,如果我设置POOL_NUMBER = 1,一切都会正常.但是,如果我想获得整体性能并没有任何意义.

On the other hand, everything works fine if I set POOL_NUMBER = 1. But it doesn't make a sense if I want to gain the total performance.

为什么会发生该错误?我该如何解决?

Why does that error happen? And how can I fix it?

我使用Python 3.5.2.

我使用以下脚本生成了数据:

I generated data using this script:

# gen_file.py

from random import randint


LENGTH = 100
SIZE = 100000


def gen_word(length):
    return ''.join(
        chr(randint(ord('а'), ord('я')))
        for _ in range(length)
    )


if __name__ == "__main__":
    with open('input.txt', 'w') as f:
        for _ in range(SIZE):
            print(gen_word(LENGTH), file=f)

推荐答案

这里的问题是从多个进程中读取文件无法正常工作,无法在进程之间共享open对象.

The issue here is reading a file from multi processes isn't working as you think, you can't share the open object between processes.

您可以创建一个全局current_line变量,并且每次读取文件并处理当前行时,都不理想.

You could make a global current_line variable, and each time read the file and process the current line, not ideal.

这是一种不同的方法,使用进程池和map方法,我遍历文件,对于每一行,我都将目标方法放入队列:

Here is a different approach, using processes pool, and map method, I'm iterating over the file, and for each line I enqueue your target method:

from multiprocessing import Lock
from multiprocessing import Pool
import time
import os

POOL_NUMBER = 8

def target(line):
    # Really need some processing here
    for _ in range(2**10):
        pass
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
    t0 = time.time()
    processed_lines = pool.map(target, fi.readlines())
    print('Total time', time.time() - t0)

    with open('output.txt', 'w') as fo:
        for processed_line in processed_lines:
            fo.writelines(processed_line)

在我的机器上进行8个处理: Total time 1.3367934226989746

With 8 process on my machine: Total time 1.3367934226989746

并通过1个过程: Total time 4.324501991271973

如果目标函数受CPU约束,这将是最好的选择,另一种方法是将文件拆分为POOL_NUMBER块,并使每个进程将已处理的数据块(带锁!)写入输出文件.

This works best if your target function is CPU bound, a different approach would be to split the file into POOL_NUMBER chunks and make each process write a processed chunk of data(with lock!) to the output file.

另一种方法是创建一个主进程,以完成其余进程的写工作,

Another approach, is to create a master process that does the write job for the rest of the processes, here is an example.

编辑

发表评论后,我认为您无法将文件放入内存中. 为此,您可以仅遍历将逐行读入内存的文件对象.但是比我们需要修改一些代码:

After you comment i figured you can't fit the file into memory. For this, you can just iterate over the file object which will read line by line into memory. But than we need to modify the code a little big:

POOL_NUMBER = 8
CHUNK_SIZE = 50000

def target(line):
    # This is not a measurable task, since most of the time wil spent on writing the data
    # if you have a CPU bound task, this code will make sense
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
processed_lines = []

with open('input.txt', 'r') as fi:
    t0 = time.time()
    for line in fi:
        processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't 

        if len(processed_lines) == CHUNK_SIZE:
            with open('output.txt', 'w') as fo:  # reading the file line by line
                for processed_line in processed_lines:
                    fo.writelines(processed_line.get())
            processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
    print('Total time', time.time() - t0)

请记住,您可以使用CHUNK_SIZE变量来控制使用的内存量.对我来说,每个过程最多可容纳5000个.

Keep in mind that you can play with the CHUNK_SIZE variable to control how much memory you use. For me 5000 is about 10K max for each process.

PS

我认为最好将大文件拆分为较小的文件,这样您就可以解决文件的读/写锁定,并使其可扩展以进行处理(即使在另一台机器上!)

I think it would be best the split the big file into smaller files, this way you solve the read/write lock on the file, and also make it scalable to process(even on a different machine!)

这篇关于为什么multiprocessing.Lock()不锁定Python中的共享资源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆