ProcessPoolExecutor,BrokenProcessPool处理 [英] ProcessPoolExecutor, BrokenProcessPool handling
问题描述
在本文档中( https://pymotw.com/3/concurrent.futures/)它说:
ProcessPoolExecutor与ThreadPoolExecutor的工作方式相同,但是使用进程而不是线程。这允许使用CPU密集型操作一个单独的CPU,并且不受CPython解释器的全局解释器锁的阻止。
这听起来很棒!它还说:
This sounds great! It also says:
如果某个工作进程发生某种事情导致其意外退出,则ProcessPoolExecutor将被视为中断,并且不再安排任务。
这听起来很糟糕:(所以我想我的问题是:什么被认为是出乎意料?这是否仅表示退出信号是不是1吗?我可以安全地退出线程并仍然继续处理队列吗?示例如下:
This sounds bad :( So I guess my question is: What is considered "Unexpectedly?" Does that just mean the exit signal is not 1? Can I safely exit the thread and still keep processing a queue? The example is as follows:
from concurrent import futures
import os
import signal
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('getting the pid for one worker')
f1 = ex.submit(os.getpid)
pid1 = f1.result()
print('killing process {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)
print('submitting another task')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('could not start new tasks: {}'.format(e))
推荐答案
我没有看到它IRL,但是从代码来看,它看起来像返回的文件描述符不包含results_queue文件描述符。
I hadn't see it IRL, but from the code it looks like the returned file descriptors not contains the results_queue file descriptor.
fromcurrent.futures.process:
from concurrent.futures.process:
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready: # <===================================== THIS
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
等待
函数取决于系统,但假设使用Linux OS(在 multiprocessing.connection
,删除了所有与超时有关的代码):
the wait
function depends on system, but assuming linux OS (at multiprocessing.connection
, removed all timeout related code):
def wait(object_list, timeout=None):
'''
Wait till an object in object_list is ready/readable.
Returns list of those objects in object_list which are ready/readable.
'''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
# some timeout code
这篇关于ProcessPoolExecutor,BrokenProcessPool处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!