如何将 Celery 与 asyncio 结合起来? [英] How to combine Celery with asyncio?

查看:66
本文介绍了如何将 Celery 与 asyncio 结合起来?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何创建一个包装器,使 celery 任务看起来像 asyncio.Task?或者有没有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,这样说::

<块引用>

将 Celery 用作异步 I/O 框架之上的分布式层是很常见的(重要提示:将 CPU 绑定任务路由到 prefork 工作器意味着它们不会阻塞您的事件循环).

但我找不到任何专门用于 asyncio 框架的代码示例.

解决方案

01/12/2021 之前的答案(在底部找到)并没有很好地老化,因此我添加了可能的解决方案组合可能会满足那些还在研究如何共同使用 asyncio 和 Celery 的人

让我们先快速分解用例(更深入的分析在这里:asyncio和协程 vs 任务队列):

  • 如果任务受 I/O 限制,那么最好使用协程和 asyncio.
  • 如果任务受 CPU 限制,那么最好使用 Celery 或其他类似的任务管理系统.

因此在 Python 的做一件事并做好"的上下文中是有道理的;不要尝试将 asyncio 和 celery 混合在一起.

但是如果我们希望能够以异步方式和作为异步任务的方式运行方法,会发生什么?那么我们有一些选择可以考虑:

  • 我能找到的最好的例子如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现这是@Franey的回应):

    1. 定义异步方法.

    2. 使用asgiref同步.async_to_sync 模块用于包装异步方法并在 celery 任务中同步运行:

      #tasks.py导入异步从 asgiref.sync 导入 async_to_sync从芹菜进口芹菜app = Celery('async_test', broker='a_broker_url_goes_here')异步定义 return_hello():等待 asyncio.sleep(1)返回你好"@app.task(name="sync_task")定义同步任务():async_to_sync(return_hello)()

  • 我在 FastAPI 应用程序中遇到的一个用例与上一个示例相反:

    1. 一个密集的 CPU 绑定进程正在占用异步端点.

    2. 解决方案是将异步 CPU 绑定进程重构为 celery 任务,并从 Celery 队列中传递一个任务实例以供执行.

    3. 该案例可视化的最小示例:

      导入异步进口独角兽从芹菜进口芹菜从 fastapi 导入 FastAPIapp = FastAPI(title='例子')worker = Celery('worker', broker='a_broker_url_goes_here')@worker.task(name='cpu_boun')def cpu_bound_task():# 做东西但让我们简化它打印([n 表示范围内的 n(1000)])@app.get('/计算')异步定义计算():cpu_bound_task.delay()如果 __name__ == __main__":uvicorn.run('main:app', host='0.0.0.0', port=8000)

  • 另一个解决方案似乎是 @juanra@danius 在他们的答案中提出了建议,但是我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此在我们决定在生产环境中使用它们之前,需要监控这些答案.

最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我会在这里列出:

  • Celery Pool AsyncIO 似乎正好解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 01/12/2021 的 0.2.0 版)
  • aiotasks 声称是一个像 Celery 一样分发 Asyncio 协程的任务管理器";但似乎有点陈旧(大约 2 年前的最新提交)

嗯,这并没有那么好,是吗?Celery 5.0 版本没有实现 asyncio 兼容性,因此我们不知道何时以及是否会实现这一点......出于响应遗留原因(因为它是当时的答案)和评论继续,将其留在这里.

正如官方网站上所述,从 Celery 5.0 版开始:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

<块引用>

  1. Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库.
  2. 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用键入、async/await、asyncio 以及旧版本中无可替代的类似概念.

以上引用自上一个链接.

所以最好的做法是等待5.0版发布!

与此同时,快乐编码:)

How can I create a wrapper that makes celery tasks look like asyncio.Task? Or is there a better way to integrate Celery with asyncio?

@asksol, the creator of Celery, said this::

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

But I could not find any code examples specifically for asyncio framework.

解决方案

EDIT: 01/12/2021 previous answer (find it at the bottom) didn't age well therefore I added a combination of possible solutions that may satisfy those who still look on how to co-use asyncio and Celery

Lets quickly break up the use cases first (more in-depth analysis here: asyncio and coroutines vs task queues):

  • If the task is I/O bound then it tends to be better to use coroutines and asyncio.
  • If the task is CPU bound then it tends to be better to use Celery or other similar task management systems.

So it makes sense in the context of Python's "Do one thing and do it well" to not try and mix asyncio and celery together.

BUT what happens in cases where we want to be able to run a method both asynchronously and as an async task? then we have some options to consider:

  • The best example that I was able to find is the following: https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/ (and I just found out that it is @Franey's response):

    1. Define your async method.

    2. Use asgiref's sync.async_to_sync module to wrap the async method and run it synchronously inside a celery task:

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery
      
      app = Celery('async_test', broker='a_broker_url_goes_here')
      
      async def return_hello():
          await asyncio.sleep(1)
          return 'hello'
      
      
      @app.task(name="sync_task")
      def sync_task():
          async_to_sync(return_hello)()
      

  • A use case that I came upon in a FastAPI application was the reverse of the previous example:

    1. An intense CPU bound process is hogging up the async endpoints.

    2. The solution is to refactor the async CPU bound process into a celery task and pass a task instance for execution from the Celery queue.

    3. A minimal example for visualization of that case:

      import asyncio
      import uvicorn
      
      from celery import Celery
      from fastapi import FastAPI
      
      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')
      
      @worker.task(name='cpu_boun')
      def cpu_bound_task():
          # Does stuff but let's simplify it
          print([n for n in range(1000)])
      
      @app.get('/calculate')
      async def calculate():
          cpu_bound_task.delay()
      
      if __name__ == "__main__":
          uvicorn.run('main:app', host='0.0.0.0', port=8000)
      

  • Another solution seems to be what @juanra and @danius are proposing in their answers, but we have to keep in mind that performance tends to take a hit when we intermix sync and async executions, thus those answers need monitoring before we can decide to use them in a prod environment.

Finally, there are some ready-made solutions, that I cannot recommend (because I have not used them myself) but I will list them here:

  • Celery Pool AsyncIO which seems to solve exactly what Celery 5.0 didn't, but keep in mind that it seems a bit experimental (version 0.2.0 today 01/12/2021)
  • aiotasks claims to be "a Celery like task manager that distributes Asyncio coroutines" but seems a bit stale (latest commit around 2 years ago)

Well that didn't age so well did it? Version 5.0 of Celery didn't implement asyncio compatibility thus we cannot know when and if this will ever be implemented... Leaving this here for response legacy reasons (as it was the answer at the time) and for comment continuation.

That will be possible from Celery version 5.0 as stated on the official site:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. The next major version of Celery will support Python 3.5 only, where we are planning to take advantage of the new asyncio library.
  2. Dropping support for Python 2 will enable us to remove massive amounts of compatibility code, and going with Python 3.5 allows us to take advantage of typing, async/await, asyncio, and similar concepts there’s no alternative for in older versions.

The above was quoted from the previous link.

So the best thing to do is wait for version 5.0 to be distributed!

In the meantime, happy coding :)

这篇关于如何将 Celery 与 asyncio 结合起来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆