Python3 Windows多处理将套接字传递给进程 [英] Python3 Windows multiprocessing passing socket to process
问题描述
我正在尝试使多处理ServerApp
在Windows上运行.我想这个问题缺少os.fork()
功能,因此我必须以某种方式通过socket
,这不是可腌制的(?!).
I'm trying to make multiprocessing ServerApp
to work on Windows. I guess the issue is missing os.fork()
feature so I'll have to pass socket
somehow which is not pickleable (?!).
我已经看到可以使用multiprocessing.reduction
中的reduce_handle
和rebuild_handle
来实现,如
I've seen that this might be possible using reduce_handle
and rebuild_handle
from multiprocessing.reduction
as shown here but those methods are not available in Python 3 (?!). Although I have available duplicate
and steal_handle
available I can't find an example how to use them or whether I need them at all.
此外,我想知道logging
在创建新进程时是否会成为问题?
Also, I'd like to know if logging
is going to be the problem when creating a new process?
这是我的ServerApp示例:
Here's my ServerApp sample:
import logging
import socket
from select import select
from threading import Thread
from multiprocessing import Queue
from multiprocessing import Process
from sys import stdout
from time import sleep
class ServerApp(object):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def conn_handler(self, connection, address, buffer):
self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1])
try:
while True:
command = None
received_data = b''
readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands
if readable:
# Get Command ... There is more code here
command = 'Something'
if command == 'Something':
connection.sendall(command_response)
else:
print(':(')
except Exception as e:
print(e)
finally:
connection.close()
self.client_buffers.remove(buffer)
self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1])
def join(self):
while self.listener.is_alive():
self.listener.join(0.5)
def acceptor(self):
while True:
self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port)
# Accept a connection on the bound socket and fork a child process to handle it.
conn, address = self.socket.accept()
# Create Queue which will represent buffer for specific client and add it o list of all client buffers
buffer = Queue()
self.client_buffers.append(buffer)
process = Process(target=self.conn_handler, args=(conn, address, buffer))
process.daemon = True
process.start()
self.clients.append(process)
# Close the connection fd in the parent, since the child process has its own reference.
conn.close()
def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048):
self.id = id
self.port = port
self.ip = ip
self.socket = None
self.listener = None
self.buffer_size = buffer_size
# Additional attributes here....
self.clients = []
self.client_buffers = []
def run(self):
# Create TCP socket, bind port and listen for incoming connections
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(5)
self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection
self.listener.daemon = True
self.listener.start()
推荐答案
要允许python3的连接酸洗(包括套接字),应使用mulitprocessing.allow_connection_pickling
.它在ForkingPickler
中注册套接字的减速器.例如:
To allow connection pickling (including sockets) for python3, you should use the mulitprocessing.allow_connection_pickling
. It registers reducers for sockets in ForkingPickler
. For instance:
import socket
import multiprocessing as mp
mp.allow_connection_pickling()
def _test_connection(conn):
msg = conn.recv(2)
conn.send(msg)
conn.close()
print("ok")
if __name__ == '__main__':
server, client = socket.socketpair()
p = mp.Process(target=_test_connection, args=(server,))
p.start()
client.settimeout(5)
msg = b'42'
client.send(msg)
assert client.recv(2) == msg
p.join()
assert p.exitcode == 0
client.close()
server.close()
我还注意到,您还有一些其他问题尚未解决socket
的酸洗问题.
I also noticed that you have some other issues unrealted to the pickling of socket
.
-
当使用
self.conn_handler
作为目标时,多重处理将尝试使整个对象self
酸洗.这是一个问题,因为您的对象包含一些无法腌制的Thread
.因此,您应该从目标函数的关闭位置删除self
.可以通过使用@staticmethod
装饰器并删除函数中所有对self
的提及来实现.
When use
self.conn_handler
as a target, the multiprocessing will try to pickle the entire objectself
. This is an issue as your object contains someThread
that cannot be pickled. You should thus removeself
from the closure of your target function. It can be done by using the@staticmethod
decorator and by removing all mention ofself
in the function.
此外,没有完成logging
模块来处理多个进程.基本上,启动的Process
中的所有日志将与当前代码一起丢失.要解决此问题,可以在启动第二个Process
(在conn_handler
的开头)后启动一个新的logging
,或使用multiprocessing
日志记录实用程序.
Also, the logging
module is not done to handle multiple processes. Basically, all the logs from the launched Process
will be lost with your current code. To fix that, you can either start a new logging
once you start the second Process
(at the beginning of conn_handler
) or use the multiprocessing
logging utility.
这可以给出类似这样的信息:
This can gives something like this:
import logging
import socket
from select import select
from threading import Thread
from multiprocessing import util, get_context
from sys import stdout
from time import sleep
util.log_to_stderr(20)
ctx = get_context("spawn")
class ServerApp(object):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp',
buffer_size=2048):
self.id = id
self.port = port
self.ip = ip
self.socket = None
self.listener = None
self.buffer_size = buffer_size
# Additional attributes here....
self.clients = []
self.client_buffers = []
@staticmethod
def conn_handler(id, connection, address, buffer):
print("test")
util.info("[%d] - Connection from %s:%d", id, address[0], address[1])
try:
while True:
command = None
received_data = b''
# Check for client commands
readable, writable, exceptional = select([connection], [], [],
0)
if readable:
# Get Command ... There is more code here
command = 'Something'
if command == 'Something':
connection.sendall(b"Coucouc")
break
else:
print(':(')
sleep(.1)
except Exception as e:
print(e)
finally:
connection.close()
util.info("[%d] - Connection from %s:%d has been closed.", id,
address[0], address[1])
print("Close")
def join(self):
while self.listener.is_alive():
self.listener.join(0.5)
def acceptor(self):
while True:
self.logger.info("[%d] - Waiting for connection on %s:%d", self.id,
self.ip, self.port)
# Accept a connection on the bound socket and fork a child process
# to handle it.
conn, address = self.socket.accept()
# Create Queue which will represent buffer for specific client and
# add it o list of all client buffers
buffer = ctx.Queue()
self.client_buffers.append(buffer)
process = ctx.Process(target=self.conn_handler,
args=(self.id, conn, address, buffer))
process.daemon = True
process.start()
self.clients.append(process)
# Close the connection fd in the parent, since the child process
# has its own reference.
conn.close()
def run(self):
# Create TCP socket, bind port and listen for incoming connections
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(5)
# Run acceptor thread to handle new connection
self.listener = Thread(target=self.acceptor)
self.listener.daemon = True
self.listener.start()
self.listener.join()
def main():
app = ServerApp(0)
app.run()
if __name__ == '__main__':
main()
我只在Unix和python3.6上进行了测试,但是由于我在Windows中使用了spawn context , which should behave like the
Process`,因此它的行为应该不会太过不同.
I only tested it on Unix and python3.6 but it should not have behavior too different as I use the spawn context, which should behave like the
Process` in windows.
这篇关于Python3 Windows多处理将套接字传递给进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!