带有线程的Python超时上下文管理器 [英] Python timeout context manager with threads
问题描述
我有timeout
上下文管理器,可以与信号完美配合,但是由于信号仅在主线程中起作用,因此它在多线程模式下会引发错误.
I have timeout
context manager that works perfectly with signals but it raises error in multithread mode because signals work only in main thread.
def timeout_handler(signum, frame):
raise TimeoutException()
@contextmanager
def timeout(seconds):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
我已经看到了timeout
的装饰器实现,但是我不知道如何在threading.Thread
派生的类中传递yield
.我的变体无法正常工作.
I've seen decorator implementation of timeout
but I don't know how to pass yield
inside class derived from threading.Thread
. My variant won't work.
@contextmanager
def timelimit(seconds):
class FuncThread(threading.Thread):
def run(self):
yield
it = FuncThread()
it.start()
it.join(seconds)
if it.isAlive():
raise TimeoutException()
推荐答案
如果上下文管理器保护的代码是基于循环的,请考虑按照人们处理线程杀死的方式进行处理.杀死另一个线程通常是不安全的,因此标准方法是让控制线程设置一个对工作线程可见的标志.辅助线程会定期检查该标志并彻底关闭自身.这是您可以执行类似于超时的操作的方法:
If the code guarded by the context manager is loop-based, consider handling this the way people handle thread killing. Killing another thread is generally unsafe, so the standard approach is to have the controlling thread set a flag that's visible to the worker thread. The worker thread periodically checks that flag and cleanly shuts itself down. Here's how you can do something analogous with timeouts:
class timeout(object):
def __init__(self, seconds):
self.seconds = seconds
def __enter__(self):
self.die_after = time.time() + self.seconds
return self
def __exit__(self, type, value, traceback):
pass
@property
def timed_out(self):
return time.time() > self.die_after
这是一个单线程用法示例:
Here's a single-threaded usage example:
with timeout(1) as t:
while True: # this will take a long time without a timeout
# periodically check for timeouts
if t.timed_out:
break # or raise an exception
# do some "useful" work
print "."
time.sleep(0.2)
和一个多线程的:
import thread
def print_for_n_secs(string, seconds):
with timeout(seconds) as t:
while True:
if t.timed_out:
break # or raise an exception
print string,
time.sleep(0.5)
for i in xrange(5):
thread.start_new_thread(print_for_n_secs,
('thread%d' % (i,), 2))
time.sleep(0.25)
这种方法比使用信号更具侵入性,但它适用于任意线程.
This approach is more intrusive than using signals but it works for arbitrary threads.
这篇关于带有线程的Python超时上下文管理器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!