python多处理队列错误 [英] python multiprocessing queue error
问题描述
我有这个python代码来读取文件,做一些处理并并行写入结果:
I have this python code to read a file, do some processing and write the results in parallel:
def line_chunker(path):
"""
Reads a file in chunks and yields each chunk.
Each chunk is guaranteed to end at a carriage return (EOL).
Each chunk is returned a single string.
The number of chunks the file is split into is equal to the number of CPU cores
available
"""
size = os.path.getsize(path)
cores = mp.cpu_count()
chunksize = size/cores # gives truncated integer
f = open(path)
s = f.readline() # skip header
while True:
part = f.readlines(chunksize)
if not part:
f.close()
break
else:
yield "".join(part)
f.close()
def _validate(chunk, outq):
""" Performs format validation on a given chunk of a csv file """
rows = csv.reader(StringIO(chunk))
vld = validation.RowValidator(rows)
vld.check_rows()
outq.put(vld.errors)
def _write(outq):
"""Writes lines in the outq to a text file """
outfile = open("C:/testoutput.txt", "w")
while True:
result = outq.get()
if result is None:
outfile.close()
break
else:
for line in result:
outfile.write(line)
outfile.write("\n")
def validate_pll(path):
""" Perform validation in parallel """
pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)
writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))
pool.close()
pool.join()
它以块的形式读取文件,并为每个块启动一个进程来进行处理.处理的结果放在一个队列中,由另一个进程监视.
It reads the file in chunks and for each chunk starts a process to do the processing. The results of the processing are put on a queue, which is watched by another process.
代码运行,但完成后我得到一个奇怪的EOFError
.
The code runs, but after completion I get an odd EOFError
.
我怀疑是因为我没有调用 writer.join()
,但是如果我添加这一行,就像这样:
I suspect it is because I do not call writer.join()
, but if I add this line, like so:
def validate_pll(path):
""" Perform validation in parallel """
pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)
writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))
pool.close()
pool.join()
writer.join()
代码只是挂起.知道我做错了什么吗?
The code simply hangs. Any idea what I am doing wrong?
给出的错误信息是:
Process Process-10:
Traceback (most recent call last):
File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
self.run()
File C\Anaconda\lib\multiprocessing\process.py line 114, in run
self._target(*self._args, **self._kwargs)
File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
result = outq.get()
File "(string)", line 2, in get
File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
kind, result = conn.recv()
EOFError
推荐答案
_writer
进程仍在等待主进程结束时将条目写入 outq
.它通过打开到管理共享 Queue
的 Manager
进程的阻塞连接来等待条目.现在,在主进程完成执行时,Manager
进程关闭,它将 EOF 发送到 _writer
打开的连接,您会看到该异常.
The _writer
process is still waiting for entries to be written to outq
when the main process ends. It waits for entries by opening a blocking connection to the Manager
process that manages the shared Queue
. Now, at the point that the main process completes its execution, the Manager
process shuts down, which sends the EOF to the connection that _writer
opened, and you see that exception.
要修复它,您需要告诉 _writer
在主进程结束之前关闭(并且通过扩展,Manager
进程关闭).您实际上已经为此建立了一个机制,只是没有使用它;向 outq
发送一个 None
,_writer
将有序关闭.在 writer.join()
之前调用它,事情应该可以正常工作.
To fix it, you need to tell _writer
to shut down prior to the main process ending (and by extension, the Manager
process shutting down) . You actually already have a mechanism in place for this, you're just not using it; send a None
to outq
, and _writer
will do an orderly shutdown. Call that prior to writer.join()
, and things should work fine.
这篇关于python多处理队列错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!