使用asyncio进行进程间通信? [英] Interprocess communication using asyncio?

查看:95
本文介绍了使用asyncio进行进程间通信?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组CPU密集型进程,有时会相互依赖才能继续进行.所以像

I have a set of CPU-intensive processes that once in a while depend on each other to proceed. So something like

def run():
  while True:
    do stuff
    wake up some other process
    wait for some other process to wake me up
    do stuff

在每个进程中,我都想使用async,这样我就可以始终在其他人等待被唤醒的同时运行 run 的实例.查看 asyncio 文档,高级API"中唯一的IPC选项就是我看到的部分使用套接字.我宁愿使用管道,看起来我可以使用低级API来处理管道,但是该文档充满了警告,如果您只是在编写应用程序,那么使用它是错误的.有人可以在这里做一些惯用的事情吗?(而且,速度是一个重要的因素,因此,如果还有一些习惯用法较少但性能更高的事情,我也想知道该选项.)

Within each process I'd like to use async, so that I can always have an instance of run running while others are waiting to be woken up. Looking at the asyncio docs, the only IPC option in the "High-level APIs" section that I see uses sockets. I'd much rather use a pipe, which it looks like I can perhaps do with the low-level API, but that documentation is chock full of warnings that if you're just writing an application then it's a mistake to be using it. Can someone weigh in on the idiomatic thing to do here? (And also, speed is an important factor, so if there's some less-idiomatic-but-more-performant thing I'd like to know about that option as well.)

推荐答案

我要提到 aioprocessing 库,因为我在其中一个项目中成功使用了它.它为包括IPC在内的 multiprocessing 原语提供了anync接口,例如 Process Pipe Lock 队列等.它使用线程池执行此操作:

I would like to mention the aioprocessing library, as I successfully used it in one of my projects. It provides an anync interface to the multiprocessing primitives including IPC, such as Process, Pipe, Lock, Queue and etc. It uses thread pool to do this:

    ...
    @staticmethod
    def coro_maker(func):
        def coro_func(self, *args, loop=None, **kwargs):
            return self.run_in_executor(
                getattr(self, func), *args, loop=loop, **kwargs
            )

        return coro_func

但是,老实说,很大程度上取决于要解决的问题,要同时执行的任务,因为由于事件循环,线程池的开销,异步方法中的密集型IPC本身不如同步方法有效有时最好使所有IPC操作同步,然后将它们全部放在单独的线程中.同样,这都取决于问题和环境.下面是一个远远不够全面的基准,但是它可以大致反映出所解决的问题(密集交换缓冲区).

But to be honest, a lot depends on the problem being solved, on what tasks are being performed concurrently, since the intensive IPC itself within the async approach is less effective than the synchronous approach due to overhead of event loop, thread pool and etc. Sometimes it is better to make all IPC operations synchronous and put it all in a separate thread. Again, it all depends on the problem and the environment. Below is a benchmark that is far from comprehensive, but it can give an approximate picture of the problem that is being solved in it (intensive exchange of buffers).

注意:我在此处

Sync SimpleQueue:  1.4309470653533936
AioSimpleQueue:  12.32670259475708
AioQueue:  14.342737436294556
AioPipe:  11.747064590454102
subprocess pipe stream:  7.344956159591675
socket stream:  4.360717058181763

# main.py
import sys
import time
import asyncio
import aioprocessing as ap
import multiprocessing as mp
import proc

count = 5*10**4
data = b'*'*100


async def sync_simple_queue_func():
    out_ = mp.SimpleQueue()
    in_ = mp.SimpleQueue()
    p = ap.AioProcess(target=proc.start_sync_queue_func, args=(out_, in_))
    p.start()

    begin_ts = time.time()
    for i in range(count):
        out_.put(data)
        res = in_.get()
    print('Sync SimpleQueue: ', time.time() - begin_ts)
    out_.put(None)


async def simple_queue_func():
    out_ = ap.AioSimpleQueue()
    in_ = ap.AioSimpleQueue()
    p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
    p.start()

    begin_ts = time.time()
    for i in range(count):
        await out_.coro_put(data)
        res = await in_.coro_get()
    print('AioSimpleQueue: ', time.time() - begin_ts)
    await out_.coro_put(None)


async def queue_func():
    out_ = ap.AioQueue()
    in_ = ap.AioQueue()
    p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
    p.start()
    begin_ts = time.time()
    for i in range(count):
        await out_.coro_put(data)
        res = await in_.coro_get()
    print('AioQueue: ', time.time() - begin_ts)
    await out_.coro_put(None)


async def pipe_func():
    main_, child_ = ap.AioPipe()
    p = ap.AioProcess(target=proc.start_pipe_func, args=(child_,))
    p.start()
    begin_ts = time.time()
    for i in range(count):
        await main_.coro_send(data)
        res = await main_.coro_recv()
    print('AioPipe: ', time.time() - begin_ts)
    await main_.coro_send(None)
    await p.coro_join()


server = None
async def handle_child(reader, writer):
    begin_ts = time.time()
    for i in range(count):
        writer.write(data)
        res = await reader.read(len(data))
    print('socket stream: ', time.time() - begin_ts)
    writer.close()


async def socket_func():
    global server
    addr = ('127.0.0.1', 8888)
    server = await asyncio.start_server(handle_child, *addr)
    p = ap.AioProcess(target=proc.start_socket_func, args=(addr,))
    p.start()
    async with server:
        await server.serve_forever()

async def subprocess_func():
    prog = await asyncio.create_subprocess_shell(
        'python proc.py',
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE)

    begin_ts = time.time()
    for i in range(count):
        prog.stdin.write(data)
        res = await prog.stdout.read(len(data))
    print('subprocess pipe stream: ', time.time() - begin_ts)
    prog.stdin.close()


async def main():
    await sync_simple_queue_func()
    await simple_queue_func()
    await queue_func()
    await pipe_func()
    await subprocess_func()
    await socket_func()


asyncio.run(main())

# proc.py

import asyncio
import sys

import aioprocessing as ap


async def sync_queue_func(in_, out_):
    while True:
        n = in_.get()
        if n is None:
            return
        out_.put(n)


async def queue_func(in_, out_):
    while True:
        n = await in_.coro_get()
        if n is None:
            return
        await out_.coro_put(n)

async def pipe_func(child):
    while True:
        n = await child.coro_recv()
        if n is None:
            return
        await child.coro_send(n)

data = b'*' * 100

async def socket_func(addr):

    reader, writer = await asyncio.open_connection(*addr)
    while True:
        n = await reader.read(len(data))
        if not n:
            break
        writer.write(n)


def start_sync_queue_func(in_, out_):
    asyncio.run(sync_queue_func(in_, out_))

def start_queue_func(in_, out_):
    asyncio.run(queue_func(in_, out_))


def start_pipe_func(child):
    asyncio.run(pipe_func(child))


def start_socket_func(addr):
    asyncio.run(socket_func(addr))


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    dummy = asyncio.Protocol()
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)  # sets read_transport
    w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(len(data))
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())


这篇关于使用asyncio进行进程间通信?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆