我可以以某种方式与子进程共享异步队列吗? [英] Can I somehow share an asynchronous queue with a subprocess?

查看:63
本文介绍了我可以以某种方式与子进程共享异步队列吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用一个队列将数据从父进程传递给通过multiprocessing.Process启动的子进程.但是,由于父进程使用Python的新asyncio库,因此队列方法必须是非阻塞的.据我了解,asyncio.Queue用于任务间通信,不能用于进程间通信.另外,我知道multiprocessing.Queue具有put_nowait()get_nowait()方法,但实际上我需要协程,这些协程仍然会阻止当前任务(而不是整个过程).有什么方法可以创建包裹put_nowait()/get_nowait()的协程吗?另外,multiprocessing.Queue使用的线程是否与在同一进程中运行的事件循环毕竟内部兼容?

I would like to use a queue for passing data from a parent to a child process which is launched via multiprocessing.Process. However, since the parent process uses Python's new asyncio library, the queue methods need to be non-blocking. As far as I understand, asyncio.Queue is made for inter-task communication and cannot be used for inter-process communication. Also, I know that multiprocessing.Queue has the put_nowait() and get_nowait() methods but I actually need coroutines that would still block the current task (but not the whole process). Is there some way to create coroutines that wrap put_nowait()/get_nowait()? On another note, are the threads that multiprocessing.Queue uses internally compatible after all with an event loop running in the same process?

如果没有,我还有什么其他选择?我知道我可以通过使用异步套接字自己实现这样的队列,但是我希望可以避免这种情况……

If not, what other options do I have? I know I could implement such a queue myself by making use of asynchronous sockets but I hoped I could avoid that…

我还考虑过使用管道而不是套接字,但是asyncio似乎与 multiprocessing.Pipe() .更准确地说,Pipe()返回一个 Connection 的元组.不是 类文件对象的对象.但是,asyncio.BaseEventLoop的方法 add_reader()/add_writer() 方法和 connect_read_pipe()/connect_write_pipe() 都希望使用类似文件的对象,因此不可能异步读取/写入这样的Connection.相反,subprocess程序包用作管道的普通文件状对象根本没有问题,并且

I also considered using pipes instead of sockets but it seems asyncio is not compatible with multiprocessing.Pipe(). More precisely, Pipe() returns a tuple of Connection objects which are not file-like objects. However, asyncio.BaseEventLoop's methods add_reader()/add_writer() methods and connect_read_pipe()/connect_write_pipe() all expect file-like objects, so it is impossible to asynchronously read from/write to such a Connection. In contrast, the usual file-like objects that the subprocess package uses as pipes pose no problem at all and can easily be used in combination with asyncio.

更新: 我决定进一步研究管道方法:通过通过邮件列表讨论对确切的代码感兴趣.)但是,对read()进行流处理后,我得到了OSError: [Errno 9] Bad file descriptor,我没有设法解决.同时考虑缺少对Windows的支持,我不会继续这样做再进一步.

UPDATE: I decided to explore the pipe approach a bit further: I converted the Connection objects returned by multiprocessing.Pipe() into file-like objects by retrieving the file descriptor via fileno() and passing it to os.fdopen(). Finally, I passed the resulting file-like object to the event loop's connect_read_pipe()/connect_write_pipe(). (There is some mailing list discussion on a related issue if someone is interested in the exact code.) However, read()ing the stream gave me an OSError: [Errno 9] Bad file descriptor and I didn't manage to fix this. Also considering the missing support for Windows, I will not pursue this any further.

推荐答案

此处是multiprocessing.Queue对象的实现,可以与asyncio一起使用.它提供了完整的multiprocessing.Queue接口,并添加了coro_getcoro_put方法,它们是asyncio.coroutine,可用于异步地从队列中获取/放入队列.实现细节与我的其他答案的第二个示例基本相同:ThreadPoolExecutor用于使获取/输出异步,而multiprocessing.managers.SyncManager.Queue用于在进程之间共享队列.唯一的额外技巧是实现__getstate__,以使对象可腌制,即使使用不可腌制的ThreadPoolExecutor作为实例变量也是如此.

Here is an implementation of a multiprocessing.Queue object that can be used with asyncio. It provides the entire multiprocessing.Queue interface, with the addition of coro_get and coro_put methods, which are asyncio.coroutines that can be used to asynchronously get/put from/into the queue. The implementation details are essentially the same as the second example of my other answer: ThreadPoolExecutor is used to make the get/put asynchronous, and a multiprocessing.managers.SyncManager.Queue is used to share the queue between processes. The only additional trick is implementing __getstate__ to keep the object picklable despite using a non-picklable ThreadPoolExecutor as an instance variable.

from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

如您所见,可以从父进程或子进程同步和异步使用AsyncProcessQueue.它不需要任何全局状态,并且通过将大多数复杂性封装在一个类中,使用起来比我原来的答案更优雅.

As you can see, you can use the AsyncProcessQueue both synchronously and asynchronously, from either the parent or child process. It doesn't require any global state, and by encapsulating most of the complexity in a class, is more elegant to use than my original answer.

您也许可以直接使用套接字来获得更好的性能,但是以跨平台的方式工作似乎很棘手.这还具有可在多个工作人员之间使用的优势,不需要您自己腌制/腌制等等.

You'll probably be able to get better performance using sockets directly, but getting that working in a cross-platform way seems to be pretty tricky. This also has the advantage of being usable across multiple workers, won't require you to pickle/unpickle yourself, etc.

这篇关于我可以以某种方式与子进程共享异步队列吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆