使用python的多进程进度测量池和映射函数 [英] Progress measuring with python's multiprocessing Pool and map function

查看:1955
本文介绍了使用python的多进程进度测量池和映射函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是多处理模块的初学者,请与我一起露面。下面的代码我使用并行csv处理:

 #!/ usr / bin / env python 

import csv
从时间导入sleep
从多重处理导入池
从多重处理导入cpu_count
从多重处理导入current_process
从pprint导入pprint as pp

def init_worker(x):
sleep(.5)
print(%s,%s)%(x [0],x [1])$ ​​b $ b x .append(int(x [0])** 2)
return x

def parallel_csv_processing(inputFile,outputFile,header = [Default,header,please change],separator =,,skipRows = 0,cpuCount = 1):
#打开读取输入文件的FH
inputFH = open(inputFile,rt)
csvReader = csv.reader(inputFH,delimiter = separator)

#SKIP HEADERS
跳过xrange(skipRows):
csvReader.next()
$ b b#平衡计算强大的操作 - 调用这里的功能
try:
p = Pool(processes = cpuCount)
results = p.map(init_worker,csvReader,chunksize = 10)
.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
p.terminate b
#关闭FH用于读取输入
inputFH.close()

#打开输出文件的FH
outputFH = open(outputFile,wt)
csvWriter = csv.writer(outputFH,lineterminator ='\\\
')

#WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)

#将结果写入输出文件
[csvWriter.writerow(row)for result in results]

#关闭FH用于输出输出
outputFH.close()

print pp(results)
#print len(results)

def main():
inputFile =input.csv
outputFile = output.csv
parallel_csv_processing(inputFile,outputFile,cpuCount = cpu_count())

如果__name__ =='__main__':
main()

我想以某种方式测量脚本的进度(只是纯文本不是任何花哨的ASCII艺术)。我想到的一个选项是将由 init_worker 成功处理的行与input.csv中的所有行进行比较,并打印实际状态。每一秒,你能指点我正确的解决方案吗?我发现几个文章有类似的问题,但我不能适应我的需要,因为没有使用类和 map 方法。我也想问一下 p.close(),p.join(),p.terminate()方法,我已经看到他们主要用 Process 不是类,它们是必需的, Pool 他们正确吗?使用 p.terminate()旨在使用ctrl + c终止进程,但这是不同的故事,还没有一个快乐的结局。谢谢。



PS:我的input.csv看起来像这样,如果重要:

  0,0 
1,3
2,6
3,9
...
...
48,144
49,147



PPS:正如我所说我是新手 multiprocessing 和我放在一起的代码只是工作。我可以看到的一个缺点是整个csv存储在内存中,所以如果你们有更好的主意,不要犹豫分享它。



编辑



这是我的实际代码基于您的建议:

p>

 #!/ usr / bin / env python 

import csv
从时间导入sleep
从多处理导入池
从多处理导入cpu_count
从多处理导入current_process
从pprint导入pprint作为pp
从tqdm导入tqdm

do_job(x):
sleep(.5)
#print(%s,%s)%(x [0],x [1])$ ​​b $ b x.append (x [0])** 2)
return x

def parallel_csv_processing(inputFile,outputFile,header = [Default,header,please,change] ,separator =,,skipRows = 0,cpuCount = 1):

#打开读取输入文件的FH
inputFH = open(inputFile,rb)
csvReader = csv.reader(inputFH,delimiter = separator)

#SKIP HEADERS
跳过xrange(skipRows):
csvReader.next()
$ b b#打开输出文件的开头
outputFH = open(outputFile,wt)
csvWriter = csv.writer(outputFH,lineterminator ='\\\
')

#WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)

#平衡计算强大的操作 - 调用函数here
try:
p = Pool(processes = cpuCount)
#results = p.map(do_job,csvReader,chunksize = 10)
for tqdm(p.imap_unordered(do_job,csvReader,chunksize = 10)):
csvWriter.writerow结果)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
b $ b#关闭FH用于读取输入
inputFH.close()

#关闭FH用于输出输出
outputFH.close()

print pp(result)
#print len(result)

def main():
inputFile =input.csv
outputFile =output.csv
parallel_csv_processing(inputFile,outputFile,cpuCount = cpu_count())

如果__name__ =='__main__':
main()

以下是 tqdm 的输出:

  1 [elapsed:00:05,0.20 iters / sec] 

这个输出是什么意思?在您引用的页面上, tqdm 按以下方式循环使用:

 >>>>导入时间
>>>>从tqdm import tqdm
>>>>对于i在tqdm(范围(100)):
... time.sleep(1)
...
| ### ------- | 35/100 35%[elapsed:00:35 left:01:05,1.00 iters / sec]

这个输出有意义,但我的输出是什么意思?也不似乎ctrl + c问题是固定的:在击中ctrl + c脚本后抛出一些Traceback,如果我再次击Ctrl + C,然后我得到新的Traceback等等。删除它的唯一方法是将其发送到后台(ctr + z),然后将其杀死(杀死%1)

解决方案

要显示进度,请将 pool.map 替换为 pool.imap_unordered

 来自tqdm import tqdm#$ pip install tqdm 

for tqdm(pool.imap_unordered(ini​​t_worker,csvReader,chunksize = 10)
csvWriter.writerow(result)

tqdm 部分是可选的,请参见文本进度条



意外地,它修正了您的整个csv存储在内存中和KeyboardInterrupt不是

 
!/ usr / bin / env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
time.sleep(.5)
return i ** 2

如果__name__ ==__main__:
logging.basicConfig(format =%(asctime)-15s% (levelname)s%(message)s,
datefmt =%F%T,level = logging.DEBUG)
pool = multiprocessing.Pool()
try:
for square in pool.imap_unordered(compute,itertools.count(),chunksize = 10):
logging.debug(square)#通过打印结果报告进度
(除了KeyboardInterrupt):
logging .warning(got Ctrl + C)
finally:
pool.terminate()
pool.join()

您应该可以每隔 .5 * chunksize 秒查看输出。如果按 Ctrl + C ;您应该会看到在子进程和主进程中引发的 KeyboardInterrupt 。在Python 3中,主进程立即退出。在Python 2中, KeyboardInterrupt 被延迟,直到下一批应该被打印(Python中的错误)。


I'm beginner in multiprocessing module so please bare with me. Following code I'm using for parallel csv processing:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp

def init_worker(x):
  sleep(.5)
  print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rt")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    results = p.map(init_worker, csvReader, chunksize = 10)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()
    p.terminate()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # WRITE RESULTS TO OUTPUT FILE
  [csvWriter.writerow(row) for row in results]

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(results)
  # print len(results)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

I would like to somehow measure the progress of the script (just plain text not any fancy ASCII art). The one option that comes to my mind is to compare the lines that were successfully processed by init_worker to all lines in input.csv, and print the actual state e.g. every second, can you please point me to right solution? I've found several articles with similar problematic but I was not able to adapt it to my needs because neither used the Pool class and map method. I would also like to ask about p.close(), p.join(), p.terminate() methods, I've seen them mainly with Process not Pool class, are they necessary with Pool class and have I use them correctly? Using of p.terminate() was intended to kill the process with ctrl+c but this is different story which has not an happy end yet. Thank you.

PS: My input.csv looks like this, if it matters:

0,0
1,3
2,6
3,9
...
...
48,144
49,147

PPS: as I said I'm newbie in multiprocessing and the code I've put together just works. The one drawback I can see is that whole csv is stored in memory, so if you guys have better idea do not hesitate to share it.

Edit

in reply to @J.F.Sebastian

Here is my actual code based on your suggestions:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm

def do_job(x):
  sleep(.5)
  # print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):

  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
      csvWriter.writerow(result)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(result)
  # print len(result)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

Here is output of tqdm:

1 [elapsed: 00:05,  0.20 iters/sec]

what does this output mean? On the page you've referred tqdm is used in loop following way:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
...     time.sleep(1)
... 
|###-------| 35/100  35% [elapsed: 00:35 left: 01:05,  1.00 iters/sec]

This output makes sense, but what does my output mean? Also it does not seems that ctrl+c problem is fixed: after hitting ctrl+c script throws some Traceback, if I hit ctrl+c again then I get new Traceback and so on. The only way to kill it is sending it to background (ctr+z) and then kill it (kill %1)

解决方案

To show the progress, replace pool.map with pool.imap_unordered:

from tqdm import tqdm # $ pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
    csvWriter.writerow(result)

tqdm part is optional, see Text Progress Bar in the Console

Accidentally, it fixes your "whole csv is stored in memory" and "KeyboardInterrupt is not raised" problems.

Here's a complete code example:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
                        datefmt="%F %T", level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

You should see the output in batches every .5 * chunksize seconds. If you press Ctrl+C; you should see KeyboardInterrupt raised in the child processes and in the main process. In Python 3, the main process exits immediately. In Python 2, the KeyboardInterrupt is delayed until the next batch should have been printed (bug in Python).

这篇关于使用python的多进程进度测量池和映射函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆