使用Python的Multiprocessing模块执行同时和单独的SEAWAT/MODFLOW模型运行 [英] Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs

查看:88
本文介绍了使用Python的Multiprocessing模块执行同时和单独的SEAWAT/MODFLOW模型运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在我的8处理器64位Windows 7计算机上完成100种模型的运行.我想同时运行7个模型实例,以减少总运行时间(每次运行模型大约需要9.5分钟).我看过与Python的多处理模块有关的几个线程,但仍然缺少一些东西.

I'm trying to complete 100 model runs on my 8-processor 64-bit Windows 7 machine. I'd like to run 7 instances of the model concurrently to decrease my total run time (approx. 9.5 min per model run). I've looked at several threads pertaining to the Multiprocessing module of Python, but am still missing something.

使用多处理模块

如何生成并行子级多处理器系统上的进程?

Python多处理队列

我的流程:

我想通过SEAWAT/MODFLOW运行100个不同的参数集来比较结果.我已经为每个模型运行预先构建了模型输入文件,并将它们存储在它们自己的目录中.我想做的是一次运行7个模型,直到所有实现都完成.流程之间无需通信或结果显示.到目前为止,我只能按顺序生成模型:

I have 100 different parameter sets I'd like to run through SEAWAT/MODFLOW to compare the results. I have pre-built the model input files for each model run and stored them in their own directories. What I'd like to be able to do is have 7 models running at a time until all realizations have been completed. There needn't be communication between processes or display of results. So far I have only been able to spawn the models sequentially:

import os,subprocess
import multiprocessing as mp

ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
    if f.endswith('.npy'):
        files.append(f)

## def work(cmd):
##     return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
    real = f.split('_')[2].split('.')[0]
    print 'Realization %s' % real

    mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
    mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
    seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '

    exe = seawatV4x64
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real

    os.system( exe + swt_nam )


if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    tasks = range(len(files))
    results = []
    for f in files:
        r = p.map_async(run(f), tasks, callback=results.append)

我将if __name__ == 'main':更改为以下内容,希望它可以解决for loop在上述脚本上赋予我的并行性不足的问题.但是,该模型甚至无法运行(没有Python错误):

I changed the if __name__ == 'main': to the following in hopes it would fix the lack of parallelism I feel is being imparted on the above script by the for loop. However, the model fails to even run (no Python error):

if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    p.map_async(run,((files[f],) for f in range(len(files))))

任何帮助都将不胜感激!

Any and all help is greatly appreciated!

修改2012年3月26日美国东部时间13:31

在@ J.F.中使用手动池"方法.塞巴斯蒂安(Sebastian)的回答如下:我可以并行执行外部.exe.一次调用模型实现(每批8个),但它不会等待这8次运行完成才调用下一个批处理,依此类推:

Using the "Manual Pool" method in @J.F. Sebastian's answer below I get parallel execution of my external .exe. Model realizations are called up in batches of 8 at a time, but it doesn't wait for those 8 runs to complete before calling up the next batch and so on:

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()

    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':

    mp.freeze_support() # optional if the program is not frozen
    main()

没有错误回溯可用.与多个文件一样,在对单个模型实现文件进行调用时,run()函数将执行其职责.唯一的区别是,对于多个文件,尽管每个实例都立即关闭,并且只允许一个模型运行完成,但脚本正常退出(退出代码0),这被称为len(files)次.

No error traceback is available. The run() function performs its duty when called upon a single model realization file as with mutiple files. The only difference is that with multiple files, it is called len(files) times though each of the instances immediately closes and only one model run is allowed to finish at which time the script exits gracefully (exit code 0).

main()中添加一些打印语句会显示一些有关活动线程数和线程状态的信息(请注意,这仅对8个实现文件进行了测试,以使屏幕截图更易于管理,理论上所有8个文件都应如此)可以同时运行,但是行为会继续在它们生成的地方并立即死亡,除了以下一种情况除外:

Adding some print statements to main() reveals some information about active thread-counts as well as thread status (note that this is a test on only 8 of the realization files to make the screenshot more manageable, theoretically all 8 files should be run concurrently, however the behavior continues where they are spawn and immediately die except one):

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\test')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    print('Active Count a',threading.activeCount())
    for _ in threads:
        print(_)
        q.put_nowait(None) # signal no more files
    for t in threads: 
        print(t)
        t.join() # wait for completion
    print('Active Count b',threading.activeCount())

**读取"D:\\Data\\Users..."的行是当我手动停止模型运行到完成时抛出的错误信息.一旦我停止运行模型,就会报告剩余的线程状态行,并且脚本将退出.

**The line which reads "D:\\Data\\Users..." is the error information thrown when I manually stop the model from running to completion. Once I stop the model running, the remaining thread status lines get reported and the script exits.

修改2012年3月26日美国东部时间16:24

SEAWAT确实允许并发执行,就像我过去所做的那样,它使用iPython手动生成实例并从每个模型文件文件夹启动.这次,我将从一个位置(即脚本所在的目录)启动所有模型运​​行.罪魁祸首似乎是SEAWAT正在保存某些输出的方式.运行SEAWAT时,它将立即创建与模型运行有关的文件.这些文件之一不会保存到模型实现所在的目录中,而是保存在脚本所在的顶层目录中.这可以防止任何后续线程将相同的文件名保存在相同的位置(由于这些文件名是通用的,对于每个实现而言都是非特定的,因此它们都希望这样做). SEAWAT窗口的打开时间不够长,我无法阅读甚至看不到错误消息,只有在我回去尝试使用iPython运行代码时才意识到这一点,该代码直接显示SEAWAT的打印输出,而不是打开新窗口以运行程序.

SEAWAT does allow concurrent execution as I've done this in the past, spawning instances manually using iPython and launching from each model file folder. This time around, I'm launching all model runs from a single location, namely the directory where my script resides. It looks like the culprit may be in the way SEAWAT is saving some of the output. When SEAWAT is run, it immediately creates files pertaining to the model run. One of these files is not being saved to the directory in which the model realization is located, but in the top directory where the script is located. This is preventing any subsequent threads from saving the same file name in the same location (which they all want to do since these filenames are generic and non-specific to each realization). The SEAWAT windows were not staying open long enough for me to read or even see that there was an error message, I only realized this when I went back and tried to run the code using iPython which directly displays the printout from SEAWAT instead of opening a new window to run the program.

我接受@ J.F. Sebastian的回答是,一旦我解决了该模型可执行的问题,他提供的线程代码很可能会将我带到需要的地方.

I am accepting @J.F. Sebastian's answer as it is likely that once I resolve this model-executable issue, the threading code he has provided will get me where I need to be.

最终代码

在subprocess.check_call中添加了cwd参数,以在其自己的目录中启动SEAWAT的每个实例.非常关键.

Added cwd argument in subprocess.check_call to start each instance of SEAWAT in its own directory. Very key.

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    cwd = ws + r'\reals\real%s\ss' % real
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':
    mp.freeze_support() # optional if the program is not frozen
    main()

推荐答案

我在Python代码中看不到任何计算.如果您只需要并行执行几个外部程序,那么使用subprocess运行这些程序,使用threading模块来维持恒定数量的进程运行就足够了,但是最简单的代码是使用multiprocessing.Pool:

I don't see any computations in the Python code. If you just need to execute several external programs in parallel it is sufficient to use subprocess to run the programs and threading module to maintain constant number of processes running, but the simplest code is using multiprocessing.Pool:

#!/usr/bin/env python
import os
import multiprocessing as mp

def run(filename_def_param): 
    filename, def_param = filename_def_param # unpack arguments
    ... # call external program on `filename`

def safe_run(*args, **kwargs):
    """Call run(), catch exceptions."""
    try: run(*args, **kwargs)
    except Exception as e:
        print("error: %s run(*%r, **%r)" % (e, args, kwargs))

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    workdir = os.path.join(ws, r'fieldgen\reals')
    files = ((os.path.join(workdir, f), ws)
             for f in os.listdir(workdir) if f.endswith('.npy'))

    # start processes
    pool = mp.Pool() # use all available CPUs
    pool.map(safe_run, files)

if __name__=="__main__":
    mp.freeze_support() # optional if the program is not frozen
    main()

如果文件很多,则pool.map()可以替换为for _ in pool.imap_unordered(safe_run, files): pass.

If there are many files then pool.map() could be replaced by for _ in pool.imap_unordered(safe_run, files): pass.

还有mutiprocessing.dummy.Pool提供与multiprocessing.Pool相同的接口,但是使用线程而不是在这种情况下更合适的进程.

There is also mutiprocessing.dummy.Pool that provides the same interface as multiprocessing.Pool but uses threads instead of processes that might be more appropriate in this case.

您不需要保留一些CPU.只需使用低优先级启动可执行文件的命令(在Linux上,这是一个nice程序).

You don't need to keep some CPUs free. Just use a command that starts your executables with a low priority (on Linux it is a nice program).

concurrent.futures.ThreadPoolExecutor 既简单又足够,但是需要

concurrent.futures.ThreadPoolExecutor would be both simple and sufficient but it requires 3rd-party dependency on Python 2.x (it is in the stdlib since Python 3.2).

#!/usr/bin/env python
import os
import concurrent.futures

def run(filename, def_param):
    ... # call external program on `filename`

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    future_to_file = dict((executor.submit(run, f, ws), f) for f in files)

    for future in concurrent.futures.as_completed(future_to_file):
        f = future_to_file[future]
        if future.exception() is not None:
           print('%r generated an exception: %s' % (f, future.exception()))
        # run() doesn't return anything so `future.result()` is always `None`

或者如果我们忽略了run()引发的异常:

Or if we ignore exceptions raised by run():

from itertools import repeat

... # the same

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
     executor.map(run, files, repeat(ws))
     # run() doesn't return anything so `map()` results can be ignored

subprocess + threading(手动池)解决方案

subprocess + threading (manual pool) solution

#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread

def run(filename, def_param):
    ... # define exe, swt_nam
    subprocess.check_call([exe, swt_nam]) # run external program

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
    t.daemon = True # threads die if the program dies
    t.start()

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
    if f.endswith('.npy'):
        q.put_nowait((os.path.join(wdir, f), ws))

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

这篇关于使用Python的Multiprocessing模块执行同时和单独的SEAWAT/MODFLOW模型运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆