写入多处理文件 [英] Writing to a file with multiprocessing
问题描述
我需要做一些并行计算,其结果需要按顺序写入一个文件。所以我创建了一个接收 multiprocessing.Queue
和一个文件句柄的函数,进行计算并在文件中输出结果:
import multiprocessing
from multiprocessing import Process,Queue $ b $ from mySimulation import doCalculation
#doCalculation(pars)是一个函数我必须运行许多不同的参数集合,并将结果收集到一个文件
def work(queue,fh):
,而True:
try:
参数= queue.get(block = False)
result = doCalculation(parameter)
print>> fh,string
除了:
break
if __name__ ==__main__:
nthreads = multiprocessing.cpu_count()
fh = open(foo,w)
workQueue = Queue()
parList =#我想要运行的条件列表doCalculation()
在列表中的x:
workQueue.put(x)
processes = [Process(target = writefh,args = (workQueue,fh))为范围(nthreads)]
为进程中的p:
p.start()
为进程中的p:
p.join()
fh.close()
但脚本运行后文件结束为空。我尝试将worker()函数更改为:
pre $ def $(queue,filename)
while True:
try:
fh = open(filename,a)
parameter = queue.get(block = False)
result = doCalculation(parameter)
print> ;> fh,string
fh.close()
除了:
break
并传递文件名作为参数。然后它按照我的意图工作。当我尝试按顺序做同样的事情,没有多处理,它也正常工作。
为什么在第一个版本中没有工作?我看不到问题。
另外:我可以保证两个进程不会同时写入这个文件吗?
编辑:
谢谢。我现在明白了这是工作版本:
import multiprocessing
from multiprocessing import Process,Queue
from time import sleep b
$ b $返回参数* par#只是为了模拟一些计算
def feed(queue,parlist):
用于参数p:b $ b queue.put(par)
def calc (queueIn,queueOut):
而真:
尝试:
par = queueIn.get(block = False)
printdeal with,par,
res = doCalculation(par)
queueOut.put((par,res))
除了:
break
def write(queue,fname):
fhandle = open(fname,w)
while:
try:
par,res = queue.get(block = False)
print>>除非:
break
fh ()
if __name__ ==
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed,args = (workerQueue,parlist))
calcProc = [Process(target = calc,args =(workerQueue,writerQueue))for i in range(nthreads)]
writProc = Process(target = write,args =( writerQueue,fname))
feedProc.start()
for calcProc:
p.start()
writProc.start()
feedProc.join()
for calcProc:
p.join()
writProc.join()
解决方案您真的应该使用两个队列和三种不同的处理方式。 b $ b
- 计算,把东西放在队列#2。你可以有许多这样的,因为他们从一个队列中获得,并安全地进入另一个队列。 从队列#2中获取东西并将其写入文件。你必须有这些,而不是更多。它拥有该文件,保证原子访问,并绝对保证该文件写得干净而一致。
把东西放入队列#1。
I'm having the following problem in python.
I need to do some calculations in parallel whose results I need to be written sequentially in a file. So I created a function that receives a
multiprocessing.Queue
and a file handle, do the calculation and print the result in the file:import multiprocessing from multiprocessing import Process, Queue from mySimulation import doCalculation # doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file def work(queue, fh): while True: try: parameter = queue.get(block = False) result = doCalculation(parameter) print >>fh, string except: break if __name__ == "__main__": nthreads = multiprocessing.cpu_count() fh = open("foo", "w") workQueue = Queue() parList = # list of conditions for which I want to run doCalculation() for x in parList: workQueue.put(x) processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] for p in processes: p.start() for p in processes: p.join() fh.close()
But the file ends up empty after the script runs. I tried to change the worker() function to:
def work(queue, filename): while True: try: fh = open(filename, "a") parameter = queue.get(block = False) result = doCalculation(parameter) print >>fh, string fh.close() except: break
and pass the filename as parameter. Then it works as I intended. When I try to do the same thing sequentially, without multiprocessing, it also works normally.
Why it didn't worked in the first version? I can't see the problem.
Also: can I guarantee that two processes won't try to write the file simultaneously?
EDIT:
Thanks. I got it now. This is the working version:
import multiprocessing from multiprocessing import Process, Queue from time import sleep from random import uniform def doCalculation(par): t = uniform(0,2) sleep(t) return par * par # just to simulate some calculation def feed(queue, parlist): for par in parlist: queue.put(par) def calc(queueIn, queueOut): while True: try: par = queueIn.get(block = False) print "dealing with ", par, "" res = doCalculation(par) queueOut.put((par,res)) except: break def write(queue, fname): fhandle = open(fname, "w") while True: try: par, res = queue.get(block = False) print >>fhandle, par, res except: break fhandle.close() if __name__ == "__main__": nthreads = multiprocessing.cpu_count() fname = "foo" workerQueue = Queue() writerQueue = Queue() parlist = [1,2,3,4,5,6,7,8,9,10] feedProc = Process(target = feed , args = (workerQueue, parlist)) calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] writProc = Process(target = write, args = (writerQueue, fname)) feedProc.start() for p in calcProc: p.start() writProc.start() feedProc.join () for p in calcProc: p.join() writProc.join ()
解决方案You really should use two queues and three separate kinds of processing.
Put stuff into Queue #1.
Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.
Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.
这篇关于写入多处理文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文