写入多处理文件 [英] Writing to a file with multiprocessing

查看:172
本文介绍了写入多处理文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Python中遇到以下问题。

我需要做一些并行计算,其结果需要按顺序写入一个文件。所以我创建了一个接收 multiprocessing.Queue 和一个文件句柄的函数,进行计算并在文件中输出结果:

  import multiprocessing 
from multiprocessing import Process,Queue $ b $ from mySimulation import doCalculation

#doCalculation(pars)是一个函数我必须运行许多不同的参数集合,并将结果收集到一个文件

def work(queue,fh):
,而True:
try:
参数= queue.get(block = False)
result = doCalculation(parameter)
print>> fh,string
除了:
break


if __name__ ==__main__:
nthreads = multiprocessing.cpu_count()
fh = open(foo,w)
workQueue = Queue()
parList =#我想要运行的条件列表doCalculation()
在列表中的x:
workQueue.put(x)
processes = [Process(target = writefh,args = (workQueue,fh))为范围(nthreads)]
为进程中的p:
p.start()
为进程中的p:
p.join()
fh.close()

但脚本运行后文件结束为空。我尝试将worker()函数更改为:

pre $ def $(queue,filename)
while True:
try:
fh = open(filename,a)
parameter = queue.get(block = False)
result = doCalculation(parameter)
print> ;> fh,string
fh.close()
除了:
break

并传递文件名作为参数。然后它按照我的意图工作。当我尝试按顺序做同样的事情,没有多处理,它也正常工作。

为什么在第一个版本中没有工作?我看不到问题。



另外:我可以保证两个进程不会同时写入这个文件吗?




编辑:

谢谢。我现在明白了这是工作版本:

  import multiprocessing 
from multiprocessing import Process,Queue
from time import sleep b














$ b $返回参数* par#只是为了模拟一些计算

def feed(queue,parlist):
用于参数p:b $ b queue.put(par)

def calc (queueIn,queueOut):
而真:
尝试:
par = queueIn.get(block = False)
printdeal with,par,
res = doCalculation(par)
queueOut.put((par,res))
除了:
break

def write(queue,fname):
fhandle = open(fname,w)
while:
try:
par,res = queue.get(block = False)
print>>除非:
break
fh ()

if __name__ ==
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed,args = (workerQueue,parlist))
calcProc = [Process(target = calc,args =(workerQueue,writerQueue))for i in range(nthreads)]
writProc = Process(target = write,args =( writerQueue,fname))


feedProc.start()
for calcProc:
p.start()
writProc.start()

feedProc.join()
for calcProc:
p.join()
writProc.join()


解决方案

您真的应该使用两个队列和三种不同的处理方式。 b $ b


  1. 把东西放入队列#1。

  2. 计算,把东西放在队列#2。你可以有许多这样的,因为他们从一个队列中获得,并安全地进入另一个队列。 从队列#2中获取东西并将其写入文件。你必须有这些,而不是更多。它拥有该文件,保证原子访问,并绝对保证该文件写得干净而一致。


I'm having the following problem in python.

I need to do some calculations in parallel whose results I need to be written sequentially in a file. So I created a function that receives a multiprocessing.Queue and a file handle, do the calculation and print the result in the file:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

But the file ends up empty after the script runs. I tried to change the worker() function to:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

and pass the filename as parameter. Then it works as I intended. When I try to do the same thing sequentially, without multiprocessing, it also works normally.

Why it didn't worked in the first version? I can't see the problem.

Also: can I guarantee that two processes won't try to write the file simultaneously?


EDIT:

Thanks. I got it now. This is the working version:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()

解决方案

You really should use two queues and three separate kinds of processing.

  1. Put stuff into Queue #1.

  2. Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.

  3. Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.

这篇关于写入多处理文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆