python多处理/线程清理 [英] python multiprocessing/threading cleanup

查看:76
本文介绍了python多处理/线程清理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 python 工具,基本上有这种设置:

I have a python tool, that has basically this kind of setup:

main process (P1) -> spawns a process (P2) that starts a tcp connection
                  -> spawns a thread (T1) that starts a loop to receive 
                     messages that are sent from P2 to P1 via a Queue (Q1)

server process (P2) -> spawns two threads (T2 and T3) that start loops to
                       receive messages that are sent from P1 to P2 via Queues (Q2 and Q3)

我遇到的问题是,当我停止我的程序(使用 Ctrl+C)时,它并没有退出.服务器进程结束了,但主进程只是挂在那里,我必须杀死它.

The problem I'm having is that when I stop my program (with Ctrl+C), it doesn't quit. The server process is ended, but the main process just hangs there and I have to kill it.

线程循环函数看起来都一样:

The thread loop functions all look the same:

def _loop(self):
    while self.running:
        res = self.Q1.get()
        if res is None:
            break
        self._handle_msg(res)

所有线程都作为守护进程启动:

All threads are started as daemon:

t = Thread(target=self._loop)
t.setDaemon(True)
t.start()

在我的主进程中,我使用 atexit 来执行清理任务:

In my main process, I use atexit, to perform clean-up tasks:

atexit.register(self.on_exit)

那些清理任务基本上如下:

Those clean-up tasks are essentially the following:

1) 将 P1 中的 self.running 设置为 False 并将 None 发送到 Q1,以便线程 T1 完成

1) set self.running in P1 to False and sent None to Q1, so that the Thread T1 should finish

self.running = False
self.Q1.put(None)

2) 通过 Q2 向 P2 发送消息,通知此进程它正在结束

2) send a message to P2 via Q2 to inform this process that it is ending

self.Q2.put("stop")

3) 在 P2 中,对停止"消息做出反应并执行我们在 P1 中所做的操作

3) In P2, react to the "stop" message and do what we did in P1

self.running = False
self.Q2.put(None)
self.Q3.put(None)

就是这样,据我所知,这应该可以让一切正常关闭,但事实并非如此.

That is it and in my understanding, that should make everything shut down nicely, but it doesn't.

P1 的主要代码还包含以下无限循环,否则程序会提前结束:

The main code of P1 also contains the following endless loop, because otherwise the program would end prematurely:

while running:
    sleep(1)

也许这与问题有关,但我不明白为什么会这样.

Maybe that has something to do with the problem, but I cannot see why it should.

那我做错了什么?我的设置是否存在重大设计缺陷?我忘记关闭某些东西了吗?

So what did I do wrong? Does my setup have major design flaws? Did I forget to shut down something?

编辑

好的,我修改了我的代码并设法让它在大部分时间正确关闭.不幸的是,从那时起,它仍然卡住了.

Ok, I modified my code and managed to make it shut down correctly most of the time. Unfortunately, from now and then, it still got stuck.

我设法为我的代码编写了一个小的工作示例.为了演示发生了什么,您需要简单地启动脚本,然后使用 Ctrl + C 停止它.如果您在启动工具后尽快按 Ctrl + C ,现在看起来问题通常会出现.

I managed to write a small working example of my code. To demonstrate what happens, you need to simple start the script and then use Ctrl + C to stop it. It looks like the issue appears now usually if you press Ctrl + C as soon as possible after starting the tool.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import sys
import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep


logger = logging.getLogger("mepy-client")


class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()
        t = Thread(target=self._loop_binary)
        #t.setDaemon(True)
        t.start()

    def _loop(self):
        print "start of loop 2"
        while self.running:
            res = self.q_in.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 2"

    def _loop_binary(self):
        print "start of loop 3"
        while self.running:
            res = self.q_binary.get()
            if res is None:
                break
            self._handle_binary(res)
        print "end of loop 3"

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            print "STOP RECEIVED"
            self.running = False
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    while s.running:
        sleep(2)
        s.handle_element()


class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print "!!!!START"
        self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()

    def stop(self):
        print "!!!!STOP"
        if not self.running:
            return
        print "STOP2"
        self.running = False
        self.q_out.put(None)
        self.q_in.put(["stop2"])
        #self.q_in.put(None)
        #self.q_binary.put(None)

        try:
            if self.p and self.p.is_alive():
                self.p.terminate()
        except:
            pass

    def _loop(self):
        print "start of loop 1"
        while self.running:
            res = self.q_out.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 1"

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

running = True
def signal_handler(signal, frame):
    global running
    if running:
        running = False
        ms.stop()
    else:
        sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    ms = MediatorSender()
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while running:
        sleep(1)

推荐答案

我认为您通过在 on 上调用 p.terminate() 破坏了您的 multiprocessing.Queue子进程.文档对此有警告:

I think you're corrupting your multiprocessing.Queue by calling p.terminate() on on the child process. The docs have a warning about this:

警告:如果在关联进程使用管道或队列,则管道或队列可能会损坏,并且可能无法被其他进程使用.类似地,如果进程有获得了锁或信号量等,然后终止它很容易导致其他进程死锁.

Warning: If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

在某些情况下,看起来 p 在您的 MediatorSender._loop 方法可以使用您加载到其中的哨兵以让它知道它应该退出之前终止.

In some cases, it looks like p is terminating before your MediatorSender._loop method can consume the sentinel you loaded into it to let it know that it should exit.

此外,您正在安装一个希望仅在主进程中工作的信号处理程序,但实际上父进程都收到了 SIGINT,这意味着 signal_handler 在两个进程中都被调用,可能导致 ms.stop 被调用两次,这是由于您处理将 ms.running 设置为 <代码>错误

Also, you're installing a signal handler that expects to work in the main process only, but the SIGINT is actually received by both the parent and the child processes, which means signal_handler gets called in both processes, could result in ms.stop getting called twice, due to a race condition in the way you handle setting ms.running to False

我建议只利用两个进程都接收 SIGINT,并让父进程和子进程直接处理 KeyboardInterrupt.这样,然后每个人都让自己干净利落地关闭,而不是让父级终止子级.以下代码证明了这一点,并且在我的测试中从未挂起.我在几个地方简化了您的代码,但在功能上完全相同:

I would recommend just exploiting that both processes receive the SIGINT, and have both the parent and child handle KeyboardInterrupt directly. That way, each then have each shut themselves down cleanly, rather than have the parent terminate the child. The following code demonstrates that, and in my testing never hung. I've simplified your code in a few places, but functionally it's exactly the same:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    try:
        while True:
            sleep(2)
            s.handle_element()
    except KeyboardInterrupt:
        s.stop()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

if __name__ == "__main__":
    ms = MediatorSender()
    try:
        ms.start()
        for i in range(100):
            ms.send_chunk("some chunk of data")
        # You actually have to join w/ a timeout in a loop on 
        # Python 2.7. If you just call join(), SIGINT won't be 
        # received by the main process, and the program will 
        # hang. This is a bug, and is fixed in Python 3.x.
        while True:
            ms.loop.join()  
    except KeyboardInterrupt:
        ms.stop()

如果您更喜欢使用信号处理程序而不是捕获KeyboardInterrupt,您只需要确保子进程使用自己的信号处理程序,而不是继承父进程的:

If you prefer to use a signal handler rather than catching KeyboardInterrupt, you just need to make sure the child process uses its own signal handler, rather than inheriting the parent's:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import logging
from functools import partial
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.running = False
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    signal.signal(signal.SIGINT, partial(signal_handler_child, s))
    while s.running:
        sleep(2)
        s.handle_element()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

def signal_handler_main(ms, *args):
    ms.stop()

def signal_handler_child(s, *args):
    s.stop()

if __name__ == "__main__":
    ms = MediatorSender()
    signal.signal(signal.SIGINT, partial(signal_handler_main, ms))
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while ms.loop.is_alive():
        ms.loop.join(9999999)  
    print('done main')

这篇关于python多处理/线程清理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆