如何在Python 3.7中向multiprocessing.connection.Client(..)添加超时? [英] How do I add a timeout to multiprocessing.connection.Client(..) in Python 3.7?
问题描述
我有两个正在运行的Python程序.程序A通过 multiprocessing 模块连接到程序B:
I've got two Python programs running. Program A connects to program B with the multiprocessing module:
# Connection code in program A
# -----------------------------
import multiprocessing
import multiprocessing.connection
...
connection = multiprocessing.connection.Client(
('localhost', 19191), # <- address of program B
authkey='embeetle'.encode('utf-8') # <- authorization key
)
...
connection.send(send_data)
recv_data = connection.recv()
它在大多数情况下都能正常运行.但是,有时程序B被冻结(细节无关紧要,但是通常在程序B的GUI生成模式窗口时发生).
冻结程序B时,程序A挂在以下行:
It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:
connection = multiprocessing.connection.Client(
('localhost', 19191), # <- address of program B
authkey='embeetle'.encode('utf-8') # <- authorization key
)
它一直在等待响应.我想放置一个 timeout 参数,但是对multiprocessing.connection.Client(..)
的调用没有一个.
It keeps waiting for a response. I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..)
does not have one.
如何在此处实现超时?
注释:
我正在使用Python 3.7
的Windows 10
计算机上.
Notes:
I'm working on a Windows 10
computer with Python 3.7
.
推荐答案
我想放置一个超时参数,但是对
multiprocessing.connection.Client(..)
的调用没有一个.如何在这里实施超时?
I would like to put a timeout parameter, but the call to
multiprocessing.connection.Client(..)
does not have one. How can I implement a timeout here?
查看源于Python 3.7中的multiprocessing.connection ,对于您的用例,Client()
函数是围绕SocketClient()
的相当简短的包装器,而该包装器又包装了Connection()
.
Looking at the source to multiprocessing.connection in Python 3.7, the Client()
function is a fairly brief wrapper around SocketClient()
for your use case, which in turn wraps Connection()
.
起初,编写一个执行相同功能的ClientWithTimeout
包装器看起来相当简单,但是还要在它为连接创建的套接字上调用settimeout()
.但是,这没有正确的效果,因为:
At first it looked fairly straightforward to write a ClientWithTimeout
wrapper that does the same thing, but additionally calls settimeout()
on the socket it creates for the connection. However, this does not have the correct effect, because:
-
Python通过使用
select()
和基础的非阻塞OS套接字实现其自己的套接字超时行为;此行为是settimeout()
配置的.
Python implements its own socket timeout behaviour by using
select()
and an underlying non-blocking OS socket; this behaviour is what is configured bysettimeout()
.
Connection
直接在OS套接字句柄上操作,通过在常规Python套接字对象上调用detach()
返回该套接字句柄.
Connection
operates directly on an OS socket handle, which is returned by calling detach()
on the normal Python socket object.
由于Python将OS套接字句柄设置为非阻塞模式,因此recv()
对其的调用将立即返回,而不是等待超时时间.
Since Python has set the OS socket handle to the non-blocking mode, recv()
calls on it return immediately rather than waiting for the timeout period.
但是,我们仍然可以使用底层的SO_RCVTIMEO
套接字选项在底层OS套接字句柄上设置接收超时.
However, we can still set a receive timeout on the underlying OS socket handle by using the low-level SO_RCVTIMEO
socket option.
因此,我的解决方案的第二个版本:
Hence the second version of my solution:
from multiprocessing.connection import Connection, answer_challenge, deliver_challenge
import socket, struct
def ClientWithTimeout(address, authkey, timeout):
with socket.socket(socket.AF_INET) as s:
s.setblocking(True)
s.connect(address)
# We'd like to call s.settimeout(timeout) here, but that won't work.
# Instead, prepare a C "struct timeval" to specify timeout. Note that
# these field sizes may differ by platform.
seconds = int(timeout)
microseconds = int((timeout - seconds) * 1e6)
timeval = struct.pack("@LL", seconds, microseconds)
# And then set the SO_RCVTIMEO (receive timeout) option with this.
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)
# Now create the connection as normal.
c = Connection(s.detach())
# The following code will now fail if a socket timeout occurs.
answer_challenge(c, authkey)
deliver_challenge(c, authkey)
return c
为简便起见,我假设参数与您的示例相同,即:
For brevity, I have assumed the parameters are as per your example, i.e.:
- 地址是一个元组(表示地址族是
AF_INET
). - authkey是一个字节字符串.
如果您需要处理这些假设不成立的情况,则需要从Client()
和SocketClient()
复制更多的逻辑.
If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from Client()
and SocketClient()
.
尽管我查看了multiprocessing.connection
源以了解如何执行此操作,但是我的解决方案并未使用任何私有实现细节. Connection
,answer_challenge
和deliver_challenge
都是API的公共部分和文档部分.因此,此功能应该可以安全地与multiprocessing.connection
的将来版本一起使用.
Although I looked at the multiprocessing.connection
source to find out how to do this, my solution does not use any private implementation details. Connection
, answer_challenge
and deliver_challenge
are all public and documented parts of the API. This function should therefore be be safe to use with future versions of multiprocessing.connection
.
请注意,并非所有平台都支持SO_RCVTIMEO
,但至少Windows,Linux和OSX上都存在. struct timeval
的格式也是特定于平台的.我假设这两个字段始终是本机unsigned long
类型.我认为这在常见平台上应该是正确的,但不能保证始终如此.不幸的是,Python当前没有提供独立于平台的方式来实现此目的.
Note that SO_RCVTIMEO
may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of struct timeval
is also platform-specific. I have assumed that the two fields are always of the native unsigned long
type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.
下面是一个测试程序,显示了此工作方式-假定以上代码已另存为client_timeout.py
.
Below is a test program which shows this working - it assumes the above code is saved as client_timeout.py
.
from multiprocessing.connection import Client, Listener
from client_timeout import ClientWithTimeout
from threading import Thread
from time import time, sleep
addr = ('localhost', 19191)
key = 'embeetle'.encode('utf-8')
# Provide a listener which either does or doesn't accept connections.
class ListenerThread(Thread):
def __init__(self, accept):
Thread.__init__(self)
self.accept = accept
def __enter__(self):
if self.accept:
print("Starting listener, accepting connections")
else:
print("Starting listener, not accepting connections")
self.active = True
self.start()
sleep(0.1)
def run(self):
listener = Listener(addr, authkey=key)
self.active = True
if self.accept:
listener.accept()
while self.active:
sleep(0.1)
listener.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.active = False
self.join()
print("Stopped listener")
return True
for description, accept, name, function in [
("ClientWithTimeout succeeds when the listener accepts connections.",
True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
("ClientWithTimeout fails after 3s when listener doesn't accept connections.",
False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
("Client succeeds when the listener accepts connections.",
True, "Client", lambda: Client(addr, authkey=key)),
("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).",
False, "Client", lambda: Client(addr, authkey=key))]:
print("Expected result:", description)
with ListenerThread(accept):
start_time = time()
try:
print("Creating connection using %s... " % name)
client = function()
print("Client created:", client)
except Exception as e:
print("Failed:", e)
print("Time elapsed: %f seconds" % (time() - start_time))
print()
在Linux上运行此命令会产生以下输出:
Running this on Linux produces the following output:
Expected result: ClientWithTimeout succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using ClientWithTimeout...
Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0>
Time elapsed: 0.003276 seconds
Stopped listener
Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections.
Starting listener, not accepting connections
Creating connection using ClientWithTimeout...
Failed: [Errno 11] Resource temporarily unavailable
Time elapsed: 3.157268 seconds
Stopped listener
Expected result: Client succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using Client...
Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50>
Time elapsed: 0.001957 seconds
Stopped listener
Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop).
Starting listener, not accepting connections
Creating connection using Client...
^C
Stopped listener
这篇关于如何在Python 3.7中向multiprocessing.connection.Client(..)添加超时?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!