python多处理队列错误 [英] python multiprocessing queue error

查看:72
本文介绍了python多处理队列错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这个python代码来读取文件,做一些处理并并行写入结果:

I have this python code to read a file, do some processing and write the results in parallel:

def line_chunker(path):
    """
    Reads a file in chunks and yields each chunk.
    Each chunk is guaranteed to end at a carriage return (EOL).
    Each chunk is returned a single string.

    The number of chunks the file is split into is equal to the number of CPU cores
    available
    """
    size = os.path.getsize(path)
    cores = mp.cpu_count()
    chunksize = size/cores # gives truncated integer

    f = open(path)
    s = f.readline() # skip header
    while True:
        part = f.readlines(chunksize)
        if not part:
            f.close()
            break
        else:
            yield "".join(part)
    f.close()

def _validate(chunk, outq):
    """ Performs format validation on a given chunk of a csv file """
    rows = csv.reader(StringIO(chunk))
    vld = validation.RowValidator(rows)
    vld.check_rows()
    outq.put(vld.errors)

def _write(outq):
    """Writes lines in the outq to a text file """
    outfile = open("C:/testoutput.txt", "w")
    while True:
        result = outq.get()
        if result is None:
            outfile.close()
            break
        else:
            for line in result:
                outfile.write(line)
                outfile.write("\n")

def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()

它以块的形式读取文件,并为每个块启动一个进程来进行处理.处理的结果放在一个队列中,由另一个进程监视.

It reads the file in chunks and for each chunk starts a process to do the processing. The results of the processing are put on a queue, which is watched by another process.

代码运行,但完成后我得到一个奇怪的EOFError.

The code runs, but after completion I get an odd EOFError.

我怀疑是因为我没有调用 writer.join(),但是如果我添加这一行,就像这样:

I suspect it is because I do not call writer.join(), but if I add this line, like so:

def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()
    writer.join()

代码只是挂起.知道我做错了什么吗?

The code simply hangs. Any idea what I am doing wrong?

给出的错误信息是:

Process Process-10:
Traceback (most recent call last):
    File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
        self.run()
    File C\Anaconda\lib\multiprocessing\process.py line 114, in run
       self._target(*self._args, **self._kwargs)
    File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
       result = outq.get()
    File "(string)", line 2, in get
    File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
        kind, result = conn.recv()
EOFError

推荐答案

_writer 进程仍在等待主进程结束时将条目写入 outq.它通过打开到管理共享 QueueManager 进程的阻塞连接来等待条目.现在,在主进程完成执行时,Manager 进程关闭,它将 EOF 发送到 _writer 打开的连接,您会看到该异常.

The _writer process is still waiting for entries to be written to outq when the main process ends. It waits for entries by opening a blocking connection to the Manager process that manages the shared Queue. Now, at the point that the main process completes its execution, the Manager process shuts down, which sends the EOF to the connection that _writer opened, and you see that exception.

要修复它,您需要告诉 _writer 在主进程结束之前关闭(并且通过扩展,Manager 进程关闭).您实际上已经为此建立了一个机制,只是没有使用它;向 outq 发送一个 None_writer 将有序关闭.在 writer.join() 之前调用它,事情应该可以正常工作.

To fix it, you need to tell _writer to shut down prior to the main process ending (and by extension, the Manager process shutting down) . You actually already have a mechanism in place for this, you're just not using it; send a None to outq, and _writer will do an orderly shutdown. Call that prior to writer.join(), and things should work fine.

这篇关于python多处理队列错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆