Python ThreadPoolExecutor - 回调是否保证在与提交的函数相同的线程中运行? [英] Python ThreadPoolExecutor - is the callback guaranteed to run in the same thread as submitted func?

查看:54
本文介绍了Python ThreadPoolExecutor - 回调是否保证在与提交的函数相同的线程中运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 ThreadPoolExecutor (TPE) 中,回调是否始终保证与提交的函数在同一线程中运行?

例如,我使用以下代码对此进行了测试.我运行了很多次,看起来 funccallback 总是在同一个线程中运行.

import concurrent.futures随机导入进口螺纹导入时间executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)定义函数(x):时间.睡眠(随机.随机())返回 threading.current_thread().namedef回调(未来):时间.睡眠(随机.随机())x = future.result()cur_thread = threading.current_thread().name如果(cur_thread != x):打印(cur_thread,x)打印('主线程:%s' % threading.current_thread())对于我在范围内(10000):未来 = executor.submit(func, i)future.add_done_callback(回调)

然而,当我删除 time.sleep(random.random()) 语句时,它似乎失败了,即至少几个 func 函数和 回调 没有在同一个线程中运行.

对于我正在处理的项目,回调必须始终与提交的函数在同一线程上运行,因此我想确保这是由 TPE 保证的.(而且没有随机睡眠的测试结果似乎令人费解).

我查看了执行程序的 源代码 并且在我们运行回调之前似乎没有将线程切换到主线程.但只是想确定一下.

解决方案

文档不保证运行在哪个线程回调中.唯一记录在案的保证是回调将在属于添加回调的进程的线程中运行,但可以是任何线程,因为您使用的是 ThreadPoolExecutor 而不是 ProcessPoolExecutor:

<块引用>

添加的可调用对象按添加顺序调用,并且始终在属于添加它们的进程的线程中调用.

<小时>

在当前的ThreadPoolExecutor实现中,回调执行的线程取决于添加回调时Future的状态,以及Future取消.这些是实现细节;您不应该依赖它们,因为它们在不同的 Python 实现或不同版本中可能会有所不同,并且它们可能会有所更改,恕不另行通知.

如果您在 Future 完成后添加回调,回调将在您调用 add_done_callback 的任何线程中执行.您可以通过查看 看到这一点add_done_callback 来源:

def add_done_callback(self, fn):"""附加一个将在未来完成时调用的可调用对象.参数:fn:一个可调用的,将以这个未来作为它的唯一调用当未来完成或取消时的参数.可调用的将始终由同一进程中的线程调用它被添加了.如果未来已经完成或已经完成取消则可调用对象将立即被调用.这些callables 按照添加的顺序调用."""使用 self._condition:如果 self._state 不在 [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 中:self._done_callbacks.append(fn)返回fn(自我)

如果 Future 的状态表明它被取消或完成,fn 将立即在当前执行线程中调用.否则,它会被添加到内部回调列表中,以便在 Future 完成时运行.

例如:

<预><代码>>>>def func(*args):... time.sleep(5)... print("func {}".format(threading.current_thread()))>>>def cb(a): print("cb {}".format(threading.current_thread()))...>>>fut = ex.submit(func)>>>func <Thread(Thread-1, started daemon 140084551563008)>>>>fut = e.add_done_callback(cb)cb <_MainThread(MainThread, 开始 140084622018368)>

如果一个成功的cancel调用取消了一个future,那么执行取消的线程会立即调用所有回调:

def 取消(自我):"""如果可能,取消未来.如果未来被取消,则返回 True,否则返回 False.一个未来如果它正在运行或已经完成,则无法取消."""使用 self._condition:如果 self._state 在 [RUNNING, FINISHED]:返回错误如果 self._state 在 [CANCELLED, CANCELLED_AND_NOTIFIED]:返回真self._state = 取消self._condition.notify_all()self._invoke_callbacks()返回真

否则,回调将由执行未来任务的线程调用.

In the ThreadPoolExecutor (TPE), is the callback always guaranteed to run in the same thread as the submitted function?

For example, I tested this with the following code. I ran it many times and it seemed like func and callback always ran in the same thread.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

However, it seemed to fail when I removed the time.sleep(random.random()) statements, i.e. at least a few func functions and callbacks did not run in the same thread.

For a project that I am working on, the callback must always run on the same thread as the submitted function, so I wanted to be sure that this is guaranteed by TPE. (And also the results of the test without the random sleep seemed puzzling).

I looked at the source code for executors and it does not seem like we switch the thread to the main thread before we run the callback. But just wanted to be sure.

解决方案

The documentation does not guarantee which thread callbacks run in. The only documented guarantee is that callbacks will be run in a thread belonging to the process that added the callback, but that could be any thread, since you're using a ThreadPoolExecutor instead of a ProcessPoolExecutor:

Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.


In the current ThreadPoolExecutor implementation, the thread a callback executes in depends on the state of the Future at the time the callback is added, and whether or not the Future is cancelled. These are implementation details; you should not rely on them, as they may be different in different Python implementations or different versions, and they are subject to change without notice.

If you add the callback after the Future completes, the callback will execute in whatever thread you called add_done_callback in. You can see this by looking at the add_done_callback source:

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)

If the state of the Future indicates it's cancelled or finished, fn is just immediately called in the current thread of execution. Otherwise, it's added to an internal list of callbacks to run when the Future is complete.

For example:

>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>

If a future is cancelled by a successful cancel call, then the thread performing the cancellation immediately invokes all callbacks:

def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

Otherwise, callbacks are invoked by the thread that executes the future's task.

这篇关于Python ThreadPoolExecutor - 回调是否保证在与提交的函数相同的线程中运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆