什么样的问题(如果有的话)会有多处理相结合ASYNCIO? [英] What kind of problems (if any) would there be combining asyncio with multiprocessing?
问题描述
由于几乎每个人都知道当他们第一次看到在Python线程,也使得生活悲惨的究竟是谁想要做处理并行人GIL - 或者至少给它一个机会。
As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.
我目前正在寻求实现像反应堆模式。有效地我要听传入套接字连接在一个线状,当有人试图连接,接受连接,并把它传递到另一个线程类进行处理。
I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.
我(还)没有肯定,我可能会面临什么样的负载。我知道有目前的设置对传入邮件2MB的上限。从理论上讲,我们可以得到每秒数千次(虽然我不知道是否实际上我们已经看到了这样的事情)。花费的时间处理邮件量不是的非常的重要,但显然更快的效果会更好。
I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.
我一直在寻找进入反应堆模式,并使用(至少在测试)似乎工作就好了多处理
库开发的一个小例子。然而,现在/我们很快就会有可用的 ASYNCIO 库,它会处理事件循环我
I was looking into the Reactor pattern, and developed a small example using the multiprocessing
library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.
有任何可能结合咬我 ASYNCIO
和多处理
?
Is there anything that could bite me by combining asyncio
and multiprocessing
?
推荐答案
您应该能够安全地将 ASYNCIO
和多处理
没有太多的麻烦,但你不应该使用多处理
直接。 的ASYNCIO
的大忌(和基于异步架构的任何其它事件循环)阻止事件循环。如果您尝试使用多
直接,任何你阻止的时间等待一个子进程,你会阻止事件循环。显然,这是不好的。
You should be able to safely combine asyncio
and multiprocessing
without too much trouble, though you shouldn't be using multiprocessing
directly. The cardinal sin of asyncio
(and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing
directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.
要避免这个的最简单的方法是使用<一href=\"https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor\"><$c$c>BaseEventLoop.run_in_executor$c$c>执行中的函数<一个href=\"https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor\"><$c$c>concurrent.futures.ProcessPoolExecutor$c$c>. ProcessPoolExecutor
是一个过程池实现的使用 multiprocessing.Process
,但 ASYNCIO
有内置的支持,它执行的功能,而不会阻塞事件循环。这里有一个简单的例子:
The simplest way to avoid this is to use BaseEventLoop.run_in_executor
to execute a function in a concurrent.futures.ProcessPoolExecutor
. ProcessPoolExecutor
is a process pool implemented using multiprocessing.Process
, but asyncio
has built-in support for executing a function in it without blocking the event loop. Here's a simple example:
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
对于大多数情况下,这是单纯的功能是不够好。如果你发现自己需要其他结构从多
,如队列
,事件
,管理
等,有一个叫第三方库 aioprocessing
(全面披露:我写的),提供了 ASYNCIO
所有的<$ C的兼容版本$ C>多数据结构。下面是一个例子demoing是:
For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing
, like Queue
, Event
, Manager
, etc., there is a third-party library called aioprocessing
(full disclosure: I wrote it), that provides asyncio
-compatible versions of all the multiprocessing
data structures. Here's an example demoing that:
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
这篇关于什么样的问题(如果有的话)会有多处理相结合ASYNCIO?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!