Python3 Windows多处理将套接字传递给进程 [英] Python3 Windows multiprocessing passing socket to process

查看:105
本文介绍了Python3 Windows多处理将套接字传递给进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使多处理ServerApp在Windows上运行.我想这个问题缺少os.fork()功能,因此我必须以某种方式通过socket,这不是可腌制的(?!).

I'm trying to make multiprocessing ServerApp to work on Windows. I guess the issue is missing os.fork() feature so I'll have to pass socket somehow which is not pickleable (?!).

我已经看到可以使用multiprocessing.reduction中的reduce_handlerebuild_handle来实现,如

I've seen that this might be possible using reduce_handle and rebuild_handle from multiprocessing.reduction as shown here but those methods are not available in Python 3 (?!). Although I have available duplicate and steal_handle available I can't find an example how to use them or whether I need them at all.

此外,我想知道logging在创建新进程时是否会成为问题?

Also, I'd like to know if logging is going to be the problem when creating a new process?

这是我的ServerApp示例:

Here's my ServerApp sample:

import logging
import socket

from select import select
from threading import Thread
from multiprocessing import Queue
from multiprocessing import Process
from sys import stdout
from time import sleep


class ServerApp(object):

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


    def conn_handler(self, connection, address, buffer):

        self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1])

        try:
            while True:

                command = None
                received_data = b''
                readable, writable, exceptional = select([connection], [], [], 0)  # Check for client commands

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'


                if command == 'Something':
                    connection.sendall(command_response)
                else:
                    print(':(')

        except Exception as e:
            print(e)
        finally:
            connection.close()
            self.client_buffers.remove(buffer)
            self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1])


    def join(self):

        while self.listener.is_alive():
            self.listener.join(0.5)


    def acceptor(self):

        while True:
            self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and add it o list of all client buffers
            buffer = Queue()
            self.client_buffers.append(buffer)

            process = Process(target=self.conn_handler, args=(conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process has its own reference.
            conn.close()


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer_size = buffer_size

        # Additional attributes here....

        self.clients = []
        self.client_buffers = []


    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        self.listener = Thread(target=self.acceptor)  # Run acceptor thread to handle new connection
        self.listener.daemon = True
        self.listener.start()

推荐答案

要允许python3的连接酸洗(包括套接字),应使用mulitprocessing.allow_connection_pickling.它在ForkingPickler中注册套接字的减速器.例如:

To allow connection pickling (including sockets) for python3, you should use the mulitprocessing.allow_connection_pickling. It registers reducers for sockets in ForkingPickler. For instance:

import socket
import multiprocessing as mp
mp.allow_connection_pickling()


def _test_connection(conn):
    msg = conn.recv(2)
    conn.send(msg)
    conn.close()
    print("ok")

if __name__ == '__main__':
    server, client = socket.socketpair()

    p = mp.Process(target=_test_connection, args=(server,))
    p.start()

    client.settimeout(5)

    msg = b'42'
    client.send(msg)
    assert client.recv(2) == msg

    p.join()
    assert p.exitcode == 0

    client.close()
    server.close()

我还注意到,您还有一些其他问题尚未解决socket的酸洗问题.

I also noticed that you have some other issues unrealted to the pickling of socket.

  • 当使用self.conn_handler作为目标时,多重处理将尝试使整个对象self酸洗.这是一个问题,因为您的对象包含一些无法腌制的Thread.因此,您应该从目标函数的关闭位置删除self.可以通过使用@staticmethod装饰器并删除函数中所有对self的提及来实现.

  • When use self.conn_handler as a target, the multiprocessing will try to pickle the entire object self. This is an issue as your object contains some Thread that cannot be pickled. You should thus remove self from the closure of your target function. It can be done by using the @staticmethod decorator and by removing all mention of self in the function.

此外,没有完成logging模块来处理多个进程.基本上,启动的Process中的所有日志将与当前代码一起丢失.要解决此问题,可以在启动第二个Process(在conn_handler的开头)后启动一个新的logging,或使用multiprocessing日志记录实用程序.

Also, the logging module is not done to handle multiple processes. Basically, all the logs from the launched Process will be lost with your current code. To fix that, you can either start a new logging once you start the second Process (at the beginning of conn_handler) or use the multiprocessing logging utility.

这可以给出类似这样的信息:

This can gives something like this:

import logging
import socket

from select import select
from threading import Thread
from multiprocessing import util, get_context
from sys import stdout
from time import sleep

util.log_to_stderr(20)
ctx = get_context("spawn")


class ServerApp(object):

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp',
                buffer_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer_size = buffer_size

        # Additional attributes here....

        self.clients = []
        self.client_buffers = []

    @staticmethod
    def conn_handler(id, connection, address, buffer):

        print("test")
        util.info("[%d] - Connection from %s:%d", id, address[0], address[1])

        try:
            while True:

                command = None
                received_data = b''
                # Check for client commands
                readable, writable, exceptional = select([connection], [], [],
                                                        0)

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'

                if command == 'Something':
                    connection.sendall(b"Coucouc")
                    break
                else:
                    print(':(')
                sleep(.1)

        except Exception as e:
            print(e)
        finally:
            connection.close()
            util.info("[%d] - Connection from %s:%d has been closed.", id,
                    address[0], address[1])
            print("Close")

    def join(self):

        while self.listener.is_alive():
            self.listener.join(0.5)

    def acceptor(self):

        while True:
            self.logger.info("[%d] - Waiting for connection on %s:%d", self.id,
                            self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process
            # to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and
            # add it o list of all client buffers
            buffer = ctx.Queue()
            self.client_buffers.append(buffer)

            process = ctx.Process(target=self.conn_handler,
                                args=(self.id, conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process
            # has its own reference.
            conn.close()

    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        # Run acceptor thread to handle new connection
        self.listener = Thread(target=self.acceptor)
        self.listener.daemon = True
        self.listener.start()

        self.listener.join()


def main():
    app = ServerApp(0)
    app.run()


if __name__ == '__main__':
    main()

我只在Unix和python3.6上进行了测试,但是由于我在Windows中使用了spawn context , which should behave like the Process`,因此它的行为应该不会太过不同.

I only tested it on Unix and python3.6 but it should not have behavior too different as I use the spawn context, which should behave like theProcess` in windows.

这篇关于Python3 Windows多处理将套接字传递给进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆