什么样的问题(如果有的话)会有多处理相结合ASYNCIO? [英] What kind of problems (if any) would there be combining asyncio with multiprocessing?

查看:366
本文介绍了什么样的问题(如果有的话)会有多处理相结合ASYNCIO?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于几乎每个人都知道当他们第一次看到在Python线程,也使得生活悲惨的究竟是谁想要做处理并行人GIL - 或者至少给它一个机会。

As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.

我目前正在寻求实现像反应堆模式。有效地我要听传入套接字连接在一个线状,当有人试图连接,接受连接,并把它传递到另一个线程类进行处理。

I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.

我(还)没有肯定,我可能会面临什么样的负载。我知道有目前的设置对传入邮件2MB的上限。从理论上讲,我们可以得到每秒数千次(虽然我不知道是否实际上我们已经看到了这样的事情)。花费的时间处理邮件量不是的非常的重要,但显然更快的效果会更好。

I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.

我一直在寻找进入反应堆模式,并使用(至少在测试)似乎工作就好了多处理库开发的一个小例子。然而,现在/我们很快就会有可用的 ASYNCIO 库,它会处理事件循环我

I was looking into the Reactor pattern, and developed a small example using the multiprocessing library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.

有任何可能结合咬我 ASYNCIO 多处理

Is there anything that could bite me by combining asyncio and multiprocessing?

推荐答案

您应该能够安全地将 ASYNCIO 多处理没有太多的麻烦,但你不应该使用多处理直接。 的ASYNCIO 的大忌(和基于异步架构的任何其它事件循环)阻止事件循环。如果您尝试使用直接,任何你阻止的时间等待一个子进程,你会阻止事件循环。显然,这是不好的。

You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

要避免这个的最简单的方法是使用<一href=\"https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor\"><$c$c>BaseEventLoop.run_in_executor执行中的函数<一个href=\"https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor\"><$c$c>concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor 是一个过程池实现的使用 multiprocessing.Process ,但 ASYNCIO 有内置的支持,它执行的功能,而不会阻塞事件循环。这里有一个简单的例子:

The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

对于大多数情况下,这是单纯的功能是不够好。如果你发现自己需要其他结构从,如队列事件管理​​等,有一个叫第三方库 aioprocessing (全面披露:我写的),提供了 ASYNCIO 所有的<$ C的兼容版本$ C>多数据结构。下面是一个例子demoing是:

For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

这篇关于什么样的问题(如果有的话)会有多处理相结合ASYNCIO?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆