如何将python asyncio与线程结合起来? [英] How to combine python asyncio with threads?

查看:42
本文介绍了如何将python asyncio与线程结合起来?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功构建了一个 RESTful 微服务侦听 POST 事件以从各种馈送器收集实时事件的 Python asyncio 和 aiohttp.

然后构建一个内存结构以在嵌套的 defaultdict/deque 结构中缓存最后 24 小时的事件.

现在我想定期检查该结构到磁盘,最好使用pickle.

由于内存结构可能大于 100MB,我想避免在检查点结构所需的时间内阻止传入事件处理.

我宁愿创建结构的快照副本(例如深拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复.

我一直在寻找有关如何组合线程(线程甚至是最好的解决方案吗?)和 asyncio 用于此目的的示例,但找不到对我有帮助的东西.

非常感谢任何入门指南!

解决方案

使用 BaseEventLoop.run_in_executor:

导入异步导入时间从 concurrent.futures 导入 ProcessPoolExecutordef cpu_bound_operation(x):time.sleep(x) # 这是一些受 CPU 限制的操作@asyncio.coroutine定义主():# 在 ProcessPoolExecutor 中运行 cpu_bound_operation# 这会让你的协程阻塞,但不会阻塞# 事件循环;其他协程可以同时运行.从 loop.run_in_executor(p, cpu_bound_operation, 5) 产生loop = asyncio.get_event_loop()p = ProcessPoolExecutor(2) # 创建一个有 2 个进程的 ProcessPoolloop.run_until_complete(main())

至于是使用ProcessPoolExecutor还是ThreadPoolExecutor,这有点难说;酸洗一个大对象肯定会消耗一些 CPU 周期,这最初会让你认为 ProcessPoolExecutor 是要走的路.但是,将 100MB 对象传递给池中的 Process 需要在主进程中对实例进行酸洗,通过 IPC 将字节发送到子进程,在子进程中对其进行解压,然后对其进行酸洗 再次,以便您可以将其写入磁盘.鉴于此,我的猜测是酸洗/取消酸洗开销将足够大,您最好使用 ThreadPoolExecutor,即使您会因为 GIL 而受到性能影响.>

也就是说,测试两种方式并确定结果非常简单,所以您不妨这样做.

I have successfully built a RESTful microservice with Python asyncio and aiohttp that listens to a POST event to collect realtime events from various feeders.

It then builds an in-memory structure to cache the last 24h of events in a nested defaultdict/deque structure.

Now I would like to periodically checkpoint that structure to disc, preferably using pickle.

Since the memory structure can be >100MB I would like to avoid holding up my incoming event processing for the time it takes to checkpoint the structure.

I'd rather create a snapshot copy (e.g. deepcopy) of the structure and then take my time to write it to disk and repeat on a preset time interval.

I have been searching for examples on how to combine threads (and is a thread even the best solution for this?) and asyncio for that purpose but could not find something that would help me.

Any pointers to get started are much appreciated!

解决方案

It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

As for whether to use a ProcessPoolExecutor or ThreadPoolExecutor, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would you make think ProcessPoolExecutor is the way to go. However, passing your 100MB object to a Process in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it again so you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor, even though you're going to take a performance hit because of the GIL.

That said, it's very simple to test both ways and find out for sure, so you might as well do that.

这篇关于如何将python asyncio与线程结合起来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆