带有多处理模块的结束守护进程 [英] end daemon processes with multiprocessing module

查看:62
本文介绍了带有多处理模块的结束守护进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面提供了多处理的示例用法.这是一个过程池模型.它并不像它看起来那么简单,但是在结构上与我实际使用的代码相对接近.抱歉,它也使用sqlalchemy.

I include an example usage of multiprocessing below. This is a process pool model. It is not as simple as it might be, but is relatively close in structure to the code I'm actually using. It also uses sqlalchemy, sorry.

我的问题是-我目前遇到一种情况,我有一个运行时间相对较长的Python脚本,该脚本正在执行许多函数,每个函数都类似于下面的代码,因此父进程在所有情况下都是相同的.换句话说,一个python脚本创建了多个池. (我想我不必这样做,但是替代方法是使用类似os.system和subprocess的东西.)问题是这些进程会徘徊并保留在内存中.文档说这些守护进程应该一直存在直到父进程退出,但是如果父进程继续生成另一个池或进程并且不会立即退出,那该怎么办.

My question is - I currently have a situation where I have a relatively long running Python script which is executing a number of functions which each look like the code below, so the parent process is the same in all cases. In other words, multiple pools are created by one python script. (I don't have to do it this way, I suppose, but the alternative is to use something like os.system and subprocess.) The problem is that these processes hang around and hold on to memory. The docs say these daemon processes are supposed to stick around till the parent process exits, but what about if the parent process then goes on to generate another pool or processes and doesn't exit immediately.

调用terminate()有效,但这似乎并不客气.是否有一个很好的方法可以要求进程很好地终止? IE.自己清理后,现在走开,我需要启动下一个游泳池吗?

Calling terminate() works, but this doesn't seem terribly polite. Is there a good way to ask the processes to terminate nicely? I.e. clean up after yourself and go away now, I need to start up the next pool?

我也尝试在进程上调用join().根据文档,这意味着等待进程终止.如果他们不打算终止该怎么办?实际发生的是该过程挂起.

I also tried calling join() on the processes. According to the documentation this means wait for the processes to terminate. What if they don't plan to terminate? What actually happens is that the process hangs.

谢谢.

致谢,法赫姆.

import multiprocessing, time

class Worker(multiprocessing.Process):
    """Process executing tasks from a given tasks queue"""
    def __init__(self, queue, num):
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        self.daemon = True

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                print "trying %s with args %s"%(func.__name__, args)
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ProcessPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.queue = multiprocessing.JoinableQueue()
        self.workerlist = []
        self.num = num_threads
        for i in range(num_threads):
            self.workerlist.append(Worker(self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()
        for worker in self.workerlist:
            print worker.__dict__
            #worker.terminate()        <--- terminate used here  
            worker.join()              <--- join used here

start = time.time()

from sqlalchemy import *
from sqlalchemy.orm import *

dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()

for i in range(10):
    make_foo(i)

m.create_all()

def do(i, dbstring):
    dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
    db = create_engine(dbstring, echo=True)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
    Session.commit()

pool = ProcessPool(5)
for i in range(10):
    pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()

推荐答案

您已经知道多重处理有工人池的课程,对吧?

You know multiprocessing already has classes for worker pools, right?

标准方法是向线程发送退出信号:

The standard way is to send your threads a quit signal:

queue.put(("QUIT", None, None))

然后检查:

if func == "QUIT":
    return

这篇关于带有多处理模块的结束守护进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆