'multiprocessing.resource_sharer'中的AttributeError'DupFd'| Python多处理+线程 [英] AttributeError 'DupFd' in 'multiprocessing.resource_sharer' | Python multiprocessing + threading

查看:94
本文介绍了'multiprocessing.resource_sharer'中的AttributeError'DupFd'| Python多处理+线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在多个执行I/O绑定任务的threading.Thread与多个执行CPU绑定任务的multiprocessing.Process之间进行通信.每当线程为某个进程找到工作时,它将与multiprocessing.Pipe(duplex=False)的发送端一起放在multiprocessing.Queue上.然后,这些过程将发挥自己的作用,并通过管道将结果发送回线程.此程序似乎在大约70%的情况下都有效,另外30%的我收到AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>

I'm trying to communicate between multiple threading.Thread(s) doing I/O-bound tasks and multiple multiprocessing.Process(es) doing CPU-bound tasks. Whenever a thread finds work for a process, it will be put on a multiprocessing.Queue, together with the sending end of a multiprocessing.Pipe(duplex=False). The processes then do their part and send results back to the threads via the Pipe. This procedure seems to work in roughly 70% of the cases, the other 30% I receive an AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>

要复制:

import multiprocessing
import threading
import time

def thread_work(work_queue, pipe):
    while True:
        work_queue.put((threading.current_thread().name,  pipe[1]))
        received = pipe[0].recv()
        print("{}: {}".format(threading.current_thread().name, threading.current_thread().name == received))
        time.sleep(0.3)

def process_work(work_queue):
    while True:
        thread, pipe = work_queue.get()
        pipe.send(thread)

work_queue = multiprocessing.Queue()
for i in range(0,3):
    receive, send = multiprocessing.Pipe(duplex=False)
    t = threading.Thread(target=thread_work, args=[work_queue, (receive, send)])
    t.daemon = True
    t.start()

for i in range(0,2):
    p = multiprocessing.Process(target=process_work, args=[work_queue])
    p.daemon = True
    p.start()

time.sleep(5)

我查看了多处理源代码,但无法理解为什么会发生此错误. 我尝试使用queue.Queue或带有duplex=True的管道(默认),但在错误中找不到模式.有人知道如何调试此方法吗?

I had a look in the multiprocessing source code, but couldn't understand why this error occurs. I tried using the queue.Queue, or a Pipe with duplex=True (default) but coudn't find a pattern in the error. Does anyone have a clue how to debug this?

推荐答案

您正在此处分叉已经是多线程的主进程.一般而言,这是有问题的.

You are forking an already multi-threaded main-process here. That is known to be problematic in general.

这实际上很容易出现问题(而不仅仅是在Python中).规则是分叉之后而不是之前的线程".否则,线程执行器使用的锁将在各个进程之间重复.如果其中一个进程在拥有该锁的同时死亡,则使用该锁的所有其他进程将死锁- Raymond Hettinger .

引发错误的原因显然是在子进程中管道的文件描述符复制失败.

Trigger for the error you get is apparantly that the duplication of the file-descriptor for the pipe fails in the child process.

要解决此问题,请在主进程仍为单线程的情况下创建您的子进程,或者使用另一个start_method来创建新的进程,例如"spawn"(在Windows上为默认值)或"forkserver",如果有的话.

To resolve this issue, either create your child-processes as long as your main-process is still single-threaded or use another start_method for creating new processes like 'spawn' (default on Windows) or 'forkserver', if available.

forkserver

forkserver

当程序启动并选择forkserver start方法时,服务器进程将启动.从那时起,每当需要一个新进程时,父进程就连接到服务器并请求它派生一个新进程. fork服务器进程是单线程的,因此使用os.fork()是安全的.没有多余的资源被继承.

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

在Unix平台上可用,该平台支持通过Unix管道传递文件描述符. 文档

Available on Unix platforms which support passing file descriptors over Unix pipes. docs

您可以使用以下方法指定另一种start_method:

You can specify another start_method with:

multiprocessing.set_start_method(方法) 设置用于启动子进程的方法.方法可以是"fork","spawn"或"forkserver".

multiprocessing.set_start_method(method) Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.

请注意,此方法最多应调用一次,并且应在主模块的if name ==' main '子句中进行保护. 文档

Note that this should be called at most once, and it should be protected inside the if name == 'main' clause of the main module. docs

有关特定start_method(在Ubuntu 18.04上)的基准,请参见此处.

For a benchmark of the specific start_methods (on Ubuntu 18.04) look here.

这篇关于'multiprocessing.resource_sharer'中的AttributeError'DupFd'| Python多处理+线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆