使用python的多进程进度测量池和映射函数 [英] Progress measuring with python's multiprocessing Pool and map function
问题描述
我是多处理
模块的初学者,请与我一起露面。下面的代码我使用并行csv处理:
#!/ usr / bin / env python
import csv
从时间导入sleep
从多重处理导入池
从多重处理导入cpu_count
从多重处理导入current_process
从pprint导入pprint as pp
def init_worker(x):
sleep(.5)
print(%s,%s)%(x [0],x [1])$ b $ b x .append(int(x [0])** 2)
return x
def parallel_csv_processing(inputFile,outputFile,header = [Default,header,please change],separator =,,skipRows = 0,cpuCount = 1):
#打开读取输入文件的FH
inputFH = open(inputFile,rt)
csvReader = csv.reader(inputFH,delimiter = separator)
#SKIP HEADERS
跳过xrange(skipRows):
csvReader.next()
$ b b#平衡计算强大的操作 - 调用这里的功能
try:
p = Pool(processes = cpuCount)
results = p.map(init_worker,csvReader,chunksize = 10)
.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
p.terminate b
#关闭FH用于读取输入
inputFH.close()
#打开输出文件的FH
outputFH = open(outputFile,wt)
csvWriter = csv.writer(outputFH,lineterminator ='\\\
')
#WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
#将结果写入输出文件
[csvWriter.writerow(row)for result in results]
#关闭FH用于输出输出
outputFH.close()
print pp(results)
#print len(results)
def main():
inputFile =input.csv
outputFile = output.csv
parallel_csv_processing(inputFile,outputFile,cpuCount = cpu_count())
如果__name__ =='__main__':
main()
我想以某种方式测量脚本的进度(只是纯文本不是任何花哨的ASCII艺术)。我想到的一个选项是将由 init_worker
成功处理的行与input.csv中的所有行进行比较,并打印实际状态。每一秒,你能指点我正确的解决方案吗?我发现几个文章有类似的问题,但我不能适应我的需要,因为没有使用池
类和 map
方法。我也想问一下 p.close(),p.join(),p.terminate()
方法,我已经看到他们主要用 Process 不是
池
类,它们是必需的, Pool
他们正确吗?使用 p.terminate()
旨在使用ctrl + c终止进程,但这是不同的故事,还没有一个快乐的结局。谢谢。
PS:我的input.csv看起来像这样,如果重要:
0,0
1,3
2,6
3,9
...
...
48,144
49,147
PPS:正如我所说我是新手 multiprocessing
和我放在一起的代码只是工作。我可以看到的一个缺点是整个csv存储在内存中,所以如果你们有更好的主意,不要犹豫分享它。
编辑
这是我的实际代码基于您的建议:
p> #!/ usr / bin / env python
import csv
从时间导入sleep
从多处理导入池
从多处理导入cpu_count
从多处理导入current_process
从pprint导入pprint作为pp
从tqdm导入tqdm
do_job(x):
sleep(.5)
#print(%s,%s)%(x [0],x [1])$ b $ b x.append (x [0])** 2)
return x
def parallel_csv_processing(inputFile,outputFile,header = [Default,header,please,change] ,separator =,,skipRows = 0,cpuCount = 1):
#打开读取输入文件的FH
inputFH = open(inputFile,rb)
csvReader = csv.reader(inputFH,delimiter = separator)
#SKIP HEADERS
跳过xrange(skipRows):
csvReader.next()
$ b b#打开输出文件的开头
outputFH = open(outputFile,wt)
csvWriter = csv.writer(outputFH,lineterminator ='\\\
')
#WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
#平衡计算强大的操作 - 调用函数here
try:
p = Pool(processes = cpuCount)
#results = p.map(do_job,csvReader,chunksize = 10)
for tqdm(p.imap_unordered(do_job,csvReader,chunksize = 10)):
csvWriter.writerow结果)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
b $ b#关闭FH用于读取输入
inputFH.close()
#关闭FH用于输出输出
outputFH.close()
print pp(result)
#print len(result)
def main():
inputFile =input.csv
outputFile =output.csv
parallel_csv_processing(inputFile,outputFile,cpuCount = cpu_count())
如果__name__ =='__main__':
main()
以下是 tqdm
的输出:
1 [elapsed:00:05,0.20 iters / sec]
这个输出是什么意思?在您引用的页面上, tqdm
按以下方式循环使用:
>>>>导入时间
>>>>从tqdm import tqdm
>>>>对于i在tqdm(范围(100)):
... time.sleep(1)
...
| ### ------- | 35/100 35%[elapsed:00:35 left:01:05,1.00 iters / sec]
这个输出有意义,但我的输出是什么意思?也不似乎ctrl + c问题是固定的:在击中ctrl + c脚本后抛出一些Traceback,如果我再次击Ctrl + C,然后我得到新的Traceback等等。删除它的唯一方法是将其发送到后台(ctr + z),然后将其杀死(杀死%1)
要显示进度,请将 pool.map
替换为 pool.imap_unordered
:
来自tqdm import tqdm#$ pip install tqdm
for tqdm(pool.imap_unordered(init_worker,csvReader,chunksize = 10)
csvWriter.writerow(result)
意外地,它修正了您的整个csv存储在内存中和KeyboardInterrupt不是
#
!/ usr / bin / env python
import itertools
import logging
import multiprocessing
import time
def compute(i):
time.sleep(.5)
return i ** 2
如果__name__ ==__main__:
logging.basicConfig(format =%(asctime)-15s% (levelname)s%(message)s,
datefmt =%F%T,level = logging.DEBUG)
pool = multiprocessing.Pool()
try:
for square in pool.imap_unordered(compute,itertools.count(),chunksize = 10):
logging.debug(square)#通过打印结果报告进度
(除了KeyboardInterrupt):
logging .warning(got Ctrl + C)
finally:
pool.terminate()
pool.join()
您应该可以每隔 .5 * chunksize
秒查看输出。如果按 Ctrl + C
;您应该会看到在子进程和主进程中引发的 KeyboardInterrupt
。在Python 3中,主进程立即退出。在Python 2中, KeyboardInterrupt
被延迟,直到下一批应该被打印(Python中的错误)。
I'm beginner in multiprocessing
module so please bare with me. Following code I'm using for parallel csv processing:
#!/usr/bin/env python
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
def init_worker(x):
sleep(.5)
print "(%s,%s)" % (x[0],x[1])
x.append(int(x[0])**2)
return x
def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rt")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
try:
p = Pool(processes = cpuCount)
results = p.map(init_worker, csvReader, chunksize = 10)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
p.terminate()
# CLOSE FH FOR READING INPUT
inputFH.close()
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# WRITE RESULTS TO OUTPUT FILE
[csvWriter.writerow(row) for row in results]
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
print pp(results)
# print len(results)
def main():
inputFile = "input.csv"
outputFile = "output.csv"
parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
if __name__ == '__main__':
main()
I would like to somehow measure the progress of the script (just plain text not any fancy ASCII art). The one option that comes to my mind is to compare the lines that were successfully processed by init_worker
to all lines in input.csv, and print the actual state e.g. every second, can you please point me to right solution? I've found several articles with similar problematic but I was not able to adapt it to my needs because neither used the Pool
class and map
method. I would also like to ask about p.close(), p.join(), p.terminate()
methods, I've seen them mainly with Process
not Pool
class, are they necessary with Pool
class and have I use them correctly? Using of p.terminate()
was intended to kill the process with ctrl+c but this is different story which has not an happy end yet. Thank you.
PS: My input.csv looks like this, if it matters:
0,0
1,3
2,6
3,9
...
...
48,144
49,147
PPS: as I said I'm newbie in multiprocessing
and the code I've put together just works. The one drawback I can see is that whole csv is stored in memory, so if you guys have better idea do not hesitate to share it.
Edit
in reply to @J.F.Sebastian
Here is my actual code based on your suggestions:
#!/usr/bin/env python
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm
def do_job(x):
sleep(.5)
# print "(%s,%s)" % (x[0],x[1])
x.append(int(x[0])**2)
return x
def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rb")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
try:
p = Pool(processes = cpuCount)
# results = p.map(do_job, csvReader, chunksize = 10)
for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
csvWriter.writerow(result)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
# CLOSE FH FOR READING INPUT
inputFH.close()
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
print pp(result)
# print len(result)
def main():
inputFile = "input.csv"
outputFile = "output.csv"
parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
if __name__ == '__main__':
main()
Here is output of tqdm
:
1 [elapsed: 00:05, 0.20 iters/sec]
what does this output mean? On the page you've referred tqdm
is used in loop following way:
>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
... time.sleep(1)
...
|###-------| 35/100 35% [elapsed: 00:35 left: 01:05, 1.00 iters/sec]
This output makes sense, but what does my output mean? Also it does not seems that ctrl+c problem is fixed: after hitting ctrl+c script throws some Traceback, if I hit ctrl+c again then I get new Traceback and so on. The only way to kill it is sending it to background (ctr+z) and then kill it (kill %1)
To show the progress, replace pool.map
with pool.imap_unordered
:
from tqdm import tqdm # $ pip install tqdm
for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
csvWriter.writerow(result)
tqdm
part is optional, see Text Progress Bar in the Console
Accidentally, it fixes your "whole csv is stored in memory" and "KeyboardInterrupt is not raised" problems.
Here's a complete code example:
#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time
def compute(i):
time.sleep(.5)
return i**2
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
datefmt="%F %T", level=logging.DEBUG)
pool = multiprocessing.Pool()
try:
for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
logging.debug(square) # report progress by printing the result
except KeyboardInterrupt:
logging.warning("got Ctrl+C")
finally:
pool.terminate()
pool.join()
You should see the output in batches every .5 * chunksize
seconds. If you press Ctrl+C
; you should see KeyboardInterrupt
raised in the child processes and in the main process. In Python 3, the main process exits immediately. In Python 2, the KeyboardInterrupt
is delayed until the next batch should have been printed (bug in Python).
这篇关于使用python的多进程进度测量池和映射函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!