Python中线程之间的通信(不使用全局变量) [英] Communication between threads in Python (without using Global Variables)

查看:580
本文介绍了Python中线程之间的通信(不使用全局变量)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有一个主线程为测试模块启动两个线程-"test_a"和"test_b". 无论测试模块线程已完成执行测试,遇到任何错误,警告或是否要更新其他信息,这两个测试模块线程均保持其状态.

主线程如何获取此信息并采取相应措施. 例如,如果"test_a"引发错误标志; "main"如何在存在错误之前知道并停止其余测试?

执行此操作的一种方法是使用全局变量,但这非常丑陋.很快.

解决方案

显而易见的解决方案是共享某种可变变量,方法是将其传递给构造函数/开始处的线程对象/函数.

干净的方法是用适当的实例属性构建一个类.如果您使用的是threading.Thread 子类,则改为只是一个线程函数,通常可以使用子类本身作为粘贴这些属性的位置.但我会用list来显示它,因为它更短:

def test_a_func(thread_state):
    # ...
    thread_state[0] = my_error_state
    # ...

def main_thread():
    test_states = [None]
    test_a = threading.Thread(target=test_a_func, args=(test_states,))
    test_a.start()

您还可以(并且通常希望)将LockCondition打包到可变状态对象中,以便可以在main_threadtest_a之间正确同步.

(另一种选择是使用queue.Queueos.pipe等来传递信息,但是您仍然需要将该队列或管道传递给子线程,方法与以上).


但是,值得考虑的是您是否真的需要这样做.如果您将test_atest_b视为作业",而不是线程功能",则可以仅在池中执行这些作业,然后让池处理将结果或错误传回.

例如:

try:
    with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
        tests = [executor.submit(job) for job in (test_a, test_b)]
        for test in concurrent.futures.as_completed(tests):
            result = test.result()
except Exception as e:
    # do stuff

现在,如果test_a函数引发异常,则主线程将获得该异常-并且,因为这意味着退出with块,所有其他作业都被取消并被丢弃,并且工作器线程关闭.

如果您使用的是2.5-3.1,则没有内置的concurrent.futures,但是您可以安装

How main thread can get access to this information and act accordingly. For example, if " test_a" raised an error flag; How "main" will know and stop rest of the tests before existing with error ?

One way to do this is using global variables but that gets very ugly.. Very soon.

解决方案

The obvious solution is to share some kind of mutable variable, by passing it in to the thread objects/functions at constructor/start.

The clean way to do this is to build a class with appropriate instance attributes. If you're using a threading.Thread subclass, instead of just a thread function, you can usually use the subclass itself as the place to stick those attributes. But I'll show it with a list just because it's shorter:

def test_a_func(thread_state):
    # ...
    thread_state[0] = my_error_state
    # ...

def main_thread():
    test_states = [None]
    test_a = threading.Thread(target=test_a_func, args=(test_states,))
    test_a.start()

You can (and usually want to) also pack a Lock or Condition into the mutable state object, so you can properly synchronize between main_thread and test_a.

(Another option is to use a queue.Queue, an os.pipe, etc. to pass information around, but you still need to get that queue or pipe to the child thread—which you do in the exact same way as above.)


However, it's worth considering whether you really need to do this. If you think of test_a and test_b as "jobs", rather than "thread functions", you can just execute those jobs on a pool, and let the pool handle passing results or errors back.

For example:

try:
    with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
        tests = [executor.submit(job) for job in (test_a, test_b)]
        for test in concurrent.futures.as_completed(tests):
            result = test.result()
except Exception as e:
    # do stuff

Now, if the test_a function raises an exception, the main thread will get that exception—and, because that means exiting the with block, and all of the other jobs get cancelled and thrown away, and the worker threads shut down.

If you're using 2.5-3.1, you don't have concurrent.futures built in, but you can install the backport off PyPI, or you can rewrite things around multiprocessing.dummy.Pool. (It's slightly more complicated that way, because you have to create a sequence of jobs and call map_async to get back an iterator over AsyncResult objects… but really that's still pretty simple.)

这篇关于Python中线程之间的通信(不使用全局变量)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆