Python使用Queue的多重处理写入相同的文件 [英] Python Multiprocessing using Queue to write to same file

查看:357
本文介绍了Python使用Queue的多重处理写入相同的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道在Stack Exchange上有很多帖子是关于将多处理结果写入单个文件的,而且在阅读完这些帖子之后,我开发了自己的代码。我试图实现的是并行运行RevMapCoord函数,并使用multiprocess.queue将其结果写入单个文件。但排队工时我有问题。我的代码:

  def RevMapCoord(list):
读取文件,查找字符串并执行某些操作

def feed(queue,parlist):
for parlist:
print('Echo from Feeder:%s'%(par))
queue.put (b)
print('** Feeder finished queing **')

def calc(queueIn,queueOut):
print('Worker function started')
而真:
尝试:
par = queueIn.get(block = False)
res = RevMapCoord(final_res)
queueOut.put((par,res))
除了:
break

def write(queue,fname):
fhandle = open(fname,w)
True:
尝试:
par,res = queue.get(block = False)
print>> fhandle,par,res
除了:
break
fhandle.close )


feedProc = Process(target = feed,args =(workerQueue,final_res))
calcProc = [工序(目标=计算值,ARGS =(workerQueue,writerQueue)),其中i在范围(NPROC)]
writProc =进程(目标=写,ARGS =(writerQueue,sco_inp_extend_geno))

feedProc.start()
print('Feeder is joining')
feedProc.join()
for calcProc:
p.start()
for calcProc:
p.join()
writProc.start()
writProc.join()

当我运行这个代码脚本在feedProc.start()步骤时,屏幕上的最后几行显示从feedProc.start()结尾处的print语句:

 来自Feeder的回声: > AK779,AT61680,50948-50968,50959,6,0.406808,阿什利,戴恩
回声从馈线产品:> AK832,AT30210,1091-1111,1102,7,0.178616,约翰,坚
**馈线完成排队**

但在执行下一行feedProc.join()之前挂起。代码没有错误,并继续运行,但什么都不做(挂起)。请告诉我我正在犯什么错误。

解决方案

使用map_async函数将多处理结果写入单个文件在Python3中。这里是我写的函数:

pre $ c $ def def PPResults(module,alist):##并行处理
npool = Pool (int(nproc))
res = npool.map_async(module,alist)
results =(res.get())###以列表形式返回的结果
返回结果


$ b $ p
$ b $ p所以,我在a_list中提供了这个函数的参数列表,而module是一个函数做处理并返回结果。上面的函数继续以列表的形式收集结果,并在处理完'a_list'中的所有参数后返回。结果可能不是正确的顺序,但顺序对我来说并不重要,这很好。 结果列表可以迭代,并将单个结果写入文件中,例如:

  fh_out = open('./ TestResults', 'w')
为结果中的i:##将列表写入文件
fh_out.write(i)

为了保持结果的顺序,我们可能需要使用类似于我在上面提到的问题中的队列。虽然我可以修复代码,但我相信这里不需要提及。



谢谢

AK


I know there are many post on Stack Exchange related to writing results from multiprocessing to single file and I have developed my code after reading only those posts. What I am trying to achieve is that run 'RevMapCoord' function in parallel and write its result in one single file using multiprocess.queue. But I am having problem while queuing my job. My Code:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

When I run this code script stucks at "feedProc.start()" step. The last few output lines from screen shows print statement from the end of "feedProc.start()":

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

But hangs before executing next line "feedProc.join ()". Code gives no error and keep on running but doing nothing(hangs). Please tell me what mistake I am making.

解决方案

I achieved writing results from multiprocessing to a single file by uing 'map_async' function in Python3. Here is the function I wrote:

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

So, I provide this function with a list of parameters in 'a_list' and 'module' is a function that does the processing and returns result. The above function keeps on collecting the results in form of list and returns back when all the parameters from 'a_list' have been processed. The results might not be correct order but as order was not important for me this worked well. The 'result' list can be iterated and individual results written in file like:

fh_out = open('./TestResults', 'w')
for i in results:##Write Results from list to file
    fh_out.write(i)

To keep the order of the results we might need to use 'queues' similar to I mentioned in my question (above). Though I am being able to fix the code but I believe it is not required to be mentioned here.

Thanks

AK

这篇关于Python使用Queue的多重处理写入相同的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆