Python中线程之间的通信(不使用全局变量) [英] Communication between threads in Python (without using Global Variables)
问题描述
假设我们有一个主线程为测试模块启动两个线程-"test_a"和"test_b". 无论测试模块线程已完成执行测试,遇到任何错误,警告或是否要更新其他信息,这两个测试模块线程均保持其状态.
主线程如何获取此信息并采取相应措施. 例如,如果"test_a"引发错误标志; "main"如何在存在错误之前知道并停止其余测试?
执行此操作的一种方法是使用全局变量,但这非常丑陋.很快.
显而易见的解决方案是共享某种可变变量,方法是将其传递给构造函数/开始处的线程对象/函数.
干净的方法是用适当的实例属性构建一个类.如果您使用的是threading.Thread
子类,则改为只是一个线程函数,通常可以使用子类本身作为粘贴这些属性的位置.但我会用list
来显示它,因为它更短:
def test_a_func(thread_state):
# ...
thread_state[0] = my_error_state
# ...
def main_thread():
test_states = [None]
test_a = threading.Thread(target=test_a_func, args=(test_states,))
test_a.start()
您还可以(并且通常希望)将Lock
或Condition
打包到可变状态对象中,以便可以在main_thread
和test_a
之间正确同步.
(另一种选择是使用queue.Queue
,os.pipe
等来传递信息,但是您仍然需要将该队列或管道传递给子线程,方法与以上).
但是,值得考虑的是您是否真的需要这样做.如果您将test_a
和test_b
视为作业",而不是线程功能",则可以仅在池中执行这些作业,然后让池处理将结果或错误传回.
例如:
try:
with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
tests = [executor.submit(job) for job in (test_a, test_b)]
for test in concurrent.futures.as_completed(tests):
result = test.result()
except Exception as e:
# do stuff
现在,如果test_a
函数引发异常,则主线程将获得该异常-并且,因为这意味着退出with
块,所有其他作业都被取消并被丢弃,并且工作器线程关闭.
如果您使用的是2.5-3.1,则没有内置的 How main thread can get access to this information and act accordingly.
For example, if " test_a" raised an error flag; How "main" will know and stop rest of the tests before existing with error ? One way to do this is using global variables but that gets very ugly.. Very soon. The obvious solution is to share some kind of mutable variable, by passing it in to the thread objects/functions at constructor/start. The clean way to do this is to build a class with appropriate instance attributes. If you're using a You can (and usually want to) also pack a (Another option is to use a However, it's worth considering whether you really need to do this. If you think of For example: Now, if the If you're using 2.5-3.1, you don't have 这篇关于Python中线程之间的通信(不使用全局变量)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!concurrent.futures
,但是您可以安装
threading.Thread
subclass, instead of just a thread function, you can usually use the subclass itself as the place to stick those attributes. But I'll show it with a list
just because it's shorter:def test_a_func(thread_state):
# ...
thread_state[0] = my_error_state
# ...
def main_thread():
test_states = [None]
test_a = threading.Thread(target=test_a_func, args=(test_states,))
test_a.start()
Lock
or Condition
into the mutable state object, so you can properly synchronize between main_thread
and test_a
.queue.Queue
, an os.pipe
, etc. to pass information around, but you still need to get that queue or pipe to the child thread—which you do in the exact same way as above.)
test_a
and test_b
as "jobs", rather than "thread functions", you can just execute those jobs on a pool, and let the pool handle passing results or errors back.try:
with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
tests = [executor.submit(job) for job in (test_a, test_b)]
for test in concurrent.futures.as_completed(tests):
result = test.result()
except Exception as e:
# do stuff
test_a
function raises an exception, the main thread will get that exception—and, because that means exiting the with
block, and all of the other jobs get cancelled and thrown away, and the worker threads shut down.concurrent.futures
built in, but you can install the backport off PyPI, or you can rewrite things around multiprocessing.dummy.Pool
. (It's slightly more complicated that way, because you have to create a sequence of jobs and call map_async
to get back an iterator over AsyncResult
objects… but really that's still pretty simple.)