使用多处理池的apply_async方法时,谁运行回调? [英] Who runs the callback when using apply_async method of a multiprocessing pool?

查看:3746
本文介绍了使用多处理池的apply_async方法时,谁运行回调?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



我正在尝试使用多处理池的apply_sync方法来了解幕后的情况。谁运行回调方法?是主进程调用apply_async吗?



假设我发出一大堆apply_async命令和回调,然后继续我的程序。我的程序仍然在做事情,当apply_async开始完成。



下面是一个例子。


如果回调函数没有被执行,

  import multiprocessing 
import time

def callback(x):
print' {}'。format(multiprocessing.current_process()。name,x)

def func(x):
print'{}使用arg {}' current_process()。name,x)
return x

pool = multiprocessing.Pool()

args = range(20)
$ b b for a in args:
pool.apply_async(func,(a,),callback = callback)

print'{}进入休眠一分钟.format(multiprocessing.current_process ()。

t0 = time.time()
while time.time() - t0< 60:
pass

print'用脚本完成

输出是类似


PoolWorker-1运行func,带arg 0



PoolWorker-2 running func with arg 1



PoolWorker-3使用arg 2运行func



MainProcess睡眠一分钟



使用arg 3的PoolWorker-4 running func



使用arg 4的PoolWorker-1运行func



使用arg 5的PoolWorker-2运行func



使用arg 6的PoolWorker-3正在运行的函数



使用arg 7的PoolWorker-4运行函数



MainProcess运行回调与arg 0 < - 主进程运行回调,而它仍然在while循环!



MainProcess运行回调arg 1



MainProcess使用arg 2运行回调



MainProcess使用arg 3运行回调



MainProcess使用arg 4运行回调



PoolWorker-1运行func和arg 8



...



用脚本完成


MainProcess如何运行回调,而在它的中间while



multiprocessing.Pool 看起来像一个提示,但我不明白。


apply_async(func [,args [,kwds [,callback]]])



A



如果指定了callback,那么它应该是一个接受单个参数的可调用方法。当结果准备好回调应用到它(除非调用失败)。回调应立即完成,否则处理结果的线程将被阻塞。



解决方案

确实是文档中的一个提示:



>

回调应该立即完成否则
处理结果的线程
会被阻止。


回调在主进程中处理,但单独线程。当您创建时,它实际上在内部创建了一些线程对象:

  class Pool(object):
Process = Process

def __init __(self,processes = None,initializer = None,initargs = (),
maxtasksperchild = None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
。 。#stuff我们不关心
self._worker_handler = threading.Thread(
target = Pool._handle_workers,
args =(self,)

self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()

self._task_handler = threading.Thread(
target = Pool._handle_tasks,
args =(self._taskqueue,self._quick_put,self._outqueue,
self._pool,self._cache)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()

self._result_handler = threading.Thread(
target = Pool._handle_results,
args =(self._outqueue,self._quick_get,self._cache)

self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()

我们感兴趣的线程是 _result_handler ;



切换齿轮一秒钟,当您运行 apply_async 时,它会创建一个 ApplyResult 对象以管理从子级获取结果:

  def apply_async(self,func,args =(),kwds = {},callback = None):
assert self._state == RUN
result = ApplyResult(self._cache,callback)
self._taskqueue.put(([result._job,None,func,args,kwds]],None))
返回结果

类ApplyResult(object):

def __init __(self,cache,callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self ._cache = cache
self._ready = False
self._callback = callback
cache [self._job] = self


def _set ,i,obj):
self._success,self._value = obj
如果self._callback和self._success:
self._callback(self._value)
self。 _cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache [self._job]

如你所见, _set 方法是最终实际执行回调传入的方法,假设任务成功。还要注意,它添加到 __ init __ 结尾的全局高速缓存中。



现在,回到 _result_handler 线程对象。该对象调用 _handle_results 函数,其格式如下:

  while 1:
try:
task = get()
except(IOError,EOFError):
debug('result handler got EOFError / IOError - exiting')
return

if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state = TERMINATE')
break

如果任务是None:
debug('result handler got sentinel')
break

job,i,obj = task
try:
cache [job] ._ set(i,obj)#这里是_set(因此我们的回调)被调用!
except KeyError:
pass

#更多的东西

这是一个循环,它只是从子队列中取出结果,在 cache 中找到它的条目,并调用 _set ,它执行我们的回调。它可以运行,即使你在一个循环,因为它不是在主线程中运行。


I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

Who runs the callback method? Is it the main process that called apply_async?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

Here's an example.

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

The output is something like

PoolWorker-1 running func with arg 0

PoolWorker-2 running func with arg 1

PoolWorker-3 running func with arg 2

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 running func with arg 3

PoolWorker-1 running func with arg 4

PoolWorker-2 running func with arg 5

PoolWorker-3 running func with arg 6

PoolWorker-4 running func with arg 7

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess running callback with arg 1

MainProcess running callback with arg 2

MainProcess running callback with arg 3

MainProcess running callback with arg 4

PoolWorker-1 running func with arg 8

...

Finished with script

How is MainProcess running the callback while it's in the middle of that while loop??

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

apply_async(func[, args[, kwds[, callback]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

解决方案

There is indeed a hint in the docs:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

The interesting thread for us is _result_handler; we'll get to why shortly.

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

这篇关于使用多处理池的apply_async方法时,谁运行回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆