使用 Python 的 Multiprocessing 模块执行同时和单独的 SEAWAT/MODFLOW 模型运行 [英] Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs

查看:9
本文介绍了使用 Python 的 Multiprocessing 模块执行同时和单独的 SEAWAT/MODFLOW 模型运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在我的 8 处理器 64 位 Windows 7 机器上运行 100 个模型.我想同时运行 7 个模型实例以减少我的总运行时间(每个模型运行大约 9.5 分钟).我已经查看了与 Python 的多处理模块有关的几个线程,但仍然缺少一些东西.

concurrent.futures.ThreadPoolExecutor 既简单又足够了,但它需要 第三方对 Python 2.x 的依赖(自 Python 3.2 起就在 stdlib 中).

#!/usr/bin/env python导入操作系统导入并发期货def 运行(文件名,def_param):... # 在 `filename` 上调用外部程序# 填充文件ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'wdir = os.path.join(ws, r'fieldgen
eals')files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))# 启动线程以 concurrent.futures.ThreadPoolExecutor(max_workers=8) 作为执行者:future_to_file = dict((executor.submit(run, f, ws), f) for f in files)concurrent.futures.as_completed(future_to_file) 中的未来:f = future_to_file[未来]如果 future.exception() 不是无:print('%r 产生异常: %s' % (f, future.exception()))# run() 不返回任何东西,所以 `future.result()` 总是 `None`

或者如果我们忽略 run() 引发的异常:

从 itertools 导入重复... # 相同# 启动线程以 concurrent.futures.ThreadPoolExecutor(max_workers=8) 作为执行者:executor.map(运行,文件,重复(ws))# run() 不返回任何内容,因此可以忽略 `map()` 结果

subprocess + threading(手动池)解决方案

#!/usr/bin/env python从 __future__ 导入 print_function导入操作系统导入子流程导入系统从队列导入队列从线程导入线程def 运行(文件名,def_param):... # 定义 exe, swt_namsubprocess.check_call([exe, swt_nam]) # 运行外部程序def 工人(队列):"""处理队列中的文件."""对于 iter(queue.get, None) 中的 args:尝试:运行(*参数)except Exception as e: # 捕获异常以避免退出# 过早线程print('%r failed: %s' % (args, e,), file=sys.stderr)# 启动线程q = 队列()线程= [线程(目标=工作者,参数=(q,))为_在范围内(8)]对于线程中的 t:t.daemon = True # 如果程序死了,线程就死了t.start()# 填充文件ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'wdir = os.path.join(ws, r'fieldgen
eals')对于 os.listdir(wdir) 中的 f:如果 f.endswith('.npy'):q.put_nowait((os.path.join(wdir, f), ws))for _ in threads: q.put_nowait(None) # 表示不再有文件for t in threads: t.join() # 等待完成

I'm trying to complete 100 model runs on my 8-processor 64-bit Windows 7 machine. I'd like to run 7 instances of the model concurrently to decrease my total run time (approx. 9.5 min per model run). I've looked at several threads pertaining to the Multiprocessing module of Python, but am still missing something.

Using the multiprocessing module

How to spawn parallel child processes on a multi-processor system?

Python Multiprocessing queue

My Process:

I have 100 different parameter sets I'd like to run through SEAWAT/MODFLOW to compare the results. I have pre-built the model input files for each model run and stored them in their own directories. What I'd like to be able to do is have 7 models running at a time until all realizations have been completed. There needn't be communication between processes or display of results. So far I have only been able to spawn the models sequentially:

import os,subprocess
import multiprocessing as mp

ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
files = []
for f in os.listdir(ws + r'fieldgen
eals'):
    if f.endswith('.npy'):
        files.append(f)

## def work(cmd):
##     return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
    real = f.split('_')[2].split('.')[0]
    print 'Realization %s' % real

    mf2k = r'c:modflowmf2k.1_19inmf2k.exe '
    mf2k5 = r'c:modflowMF2005_1_8inmf2005.exe '
    seawatV4 = r'c:modflowswt_v4_00_04exeswt_v4.exe '
    seawatV4x64 = r'c:modflowswt_v4_00_04exeswt_v4x64.exe '

    exe = seawatV4x64
    swt_nam = ws + r'
eals
eal%sssss.nam_swt' % real

    os.system( exe + swt_nam )


if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    tasks = range(len(files))
    results = []
    for f in files:
        r = p.map_async(run(f), tasks, callback=results.append)

I changed the if __name__ == 'main': to the following in hopes it would fix the lack of parallelism I feel is being imparted on the above script by the for loop. However, the model fails to even run (no Python error):

if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    p.map_async(run,((files[f],) for f in range(len(files))))

Any and all help is greatly appreciated!

EDIT 3/26/2012 13:31 EST

Using the "Manual Pool" method in @J.F. Sebastian's answer below I get parallel execution of my external .exe. Model realizations are called up in batches of 8 at a time, but it doesn't wait for those 8 runs to complete before calling up the next batch and so on:

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:modflowswt_v4_00_04exeswt_v4x64.exe '
    swt_nam = ws + r'
eals
eal%sssss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
    wdir = os.path.join(ws, r'fieldgen
eals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()

    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':

    mp.freeze_support() # optional if the program is not frozen
    main()

No error traceback is available. The run() function performs its duty when called upon a single model realization file as with mutiple files. The only difference is that with multiple files, it is called len(files) times though each of the instances immediately closes and only one model run is allowed to finish at which time the script exits gracefully (exit code 0).

Adding some print statements to main() reveals some information about active thread-counts as well as thread status (note that this is a test on only 8 of the realization files to make the screenshot more manageable, theoretically all 8 files should be run concurrently, however the behavior continues where they are spawn and immediately die except one):

def main():
    # populate files
    ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
    wdir = os.path.join(ws, r'fieldgen	est')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    print('Active Count a',threading.activeCount())
    for _ in threads:
        print(_)
        q.put_nowait(None) # signal no more files
    for t in threads: 
        print(t)
        t.join() # wait for completion
    print('Active Count b',threading.activeCount())

**The line which reads "D:\Data\Users..." is the error information thrown when I manually stop the model from running to completion. Once I stop the model running, the remaining thread status lines get reported and the script exits.

EDIT 3/26/2012 16:24 EST

SEAWAT does allow concurrent execution as I've done this in the past, spawning instances manually using iPython and launching from each model file folder. This time around, I'm launching all model runs from a single location, namely the directory where my script resides. It looks like the culprit may be in the way SEAWAT is saving some of the output. When SEAWAT is run, it immediately creates files pertaining to the model run. One of these files is not being saved to the directory in which the model realization is located, but in the top directory where the script is located. This is preventing any subsequent threads from saving the same file name in the same location (which they all want to do since these filenames are generic and non-specific to each realization). The SEAWAT windows were not staying open long enough for me to read or even see that there was an error message, I only realized this when I went back and tried to run the code using iPython which directly displays the printout from SEAWAT instead of opening a new window to run the program.

I am accepting @J.F. Sebastian's answer as it is likely that once I resolve this model-executable issue, the threading code he has provided will get me where I need to be.

FINAL CODE

Added cwd argument in subprocess.check_call to start each instance of SEAWAT in its own directory. Very key.

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:modflowswt_v4_00_04exeswt_v4x64.exe '
    cwd = ws + r'
eals
eal%sss' % real
    swt_nam = ws + r'
eals
eal%sssss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
    wdir = os.path.join(ws, r'fieldgen
eals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':
    mp.freeze_support() # optional if the program is not frozen
    main()

解决方案

I don't see any computations in the Python code. If you just need to execute several external programs in parallel it is sufficient to use subprocess to run the programs and threading module to maintain constant number of processes running, but the simplest code is using multiprocessing.Pool:

#!/usr/bin/env python
import os
import multiprocessing as mp

def run(filename_def_param): 
    filename, def_param = filename_def_param # unpack arguments
    ... # call external program on `filename`

def safe_run(*args, **kwargs):
    """Call run(), catch exceptions."""
    try: run(*args, **kwargs)
    except Exception as e:
        print("error: %s run(*%r, **%r)" % (e, args, kwargs))

def main():
    # populate files
    ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
    workdir = os.path.join(ws, r'fieldgen
eals')
    files = ((os.path.join(workdir, f), ws)
             for f in os.listdir(workdir) if f.endswith('.npy'))

    # start processes
    pool = mp.Pool() # use all available CPUs
    pool.map(safe_run, files)

if __name__=="__main__":
    mp.freeze_support() # optional if the program is not frozen
    main()

If there are many files then pool.map() could be replaced by for _ in pool.imap_unordered(safe_run, files): pass.

There is also mutiprocessing.dummy.Pool that provides the same interface as multiprocessing.Pool but uses threads instead of processes that might be more appropriate in this case.

You don't need to keep some CPUs free. Just use a command that starts your executables with a low priority (on Linux it is a nice program).

ThreadPoolExecutor example

concurrent.futures.ThreadPoolExecutor would be both simple and sufficient but it requires 3rd-party dependency on Python 2.x (it is in the stdlib since Python 3.2).

#!/usr/bin/env python
import os
import concurrent.futures

def run(filename, def_param):
    ... # call external program on `filename`

# populate files
ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
wdir = os.path.join(ws, r'fieldgen
eals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    future_to_file = dict((executor.submit(run, f, ws), f) for f in files)

    for future in concurrent.futures.as_completed(future_to_file):
        f = future_to_file[future]
        if future.exception() is not None:
           print('%r generated an exception: %s' % (f, future.exception()))
        # run() doesn't return anything so `future.result()` is always `None`

Or if we ignore exceptions raised by run():

from itertools import repeat

... # the same

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
     executor.map(run, files, repeat(ws))
     # run() doesn't return anything so `map()` results can be ignored

subprocess + threading (manual pool) solution

#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread

def run(filename, def_param):
    ... # define exe, swt_nam
    subprocess.check_call([exe, swt_nam]) # run external program

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
    t.daemon = True # threads die if the program dies
    t.start()

# populate files
ws = r'D:DataUsersjbellinoProjectstJohnsDeepeningmodelxsec_a'
wdir = os.path.join(ws, r'fieldgen
eals')
for f in os.listdir(wdir):
    if f.endswith('.npy'):
        q.put_nowait((os.path.join(wdir, f), ws))

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

这篇关于使用 Python 的 Multiprocessing 模块执行同时和单独的 SEAWAT/MODFLOW 模型运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆