如何告诉芹菜工作者停止接受任务?如何检查芹菜工作者任务是否正在运行? [英] How do I tell celery worker to stop accepting tasks? How can I check if any celery worker tasks are running?

查看:143
本文介绍了如何告诉芹菜工作者停止接受任务?如何检查芹菜工作者任务是否正在运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:

  • 在由Python/Flask Web应用程序和使用Celery进行的后台任务组成的服务器上运行的系统
  • Web应用程序和celery工作者都作为新贵工作运行(Nginx后面的Web应用程序)
  • 使用以下脚本完成生产部署:

  • System running on a server consisting of a Python/Flask web application and background tasks using Celery
  • Both web application and celery workers are run as upstart jobs (Web app behind Nginx)
  • Deployment to production is done with a script that:

  • 停止暴发户
  • 将代码推送到服务器
  • 运行任何数据库迁移
  • 开始新贵工作

如何增强部署脚本,以便执行以下操作?

How can I enhance the deployment script so it does the following?:

  • 告诉芹菜工人停止接受任务
  • 等待直到当前正在运行的芹菜任务完成
  • 停止暴发户
  • 将代码推送到服务器
  • 运行任何数据库迁移
  • 开始新贵工作
  • Tell the celery worker to stop accepting tasks
  • Wait until any currently running celery tasks are finished
  • Stop the upstart jobs
  • Push code to server
  • Run any db migrations
  • Start the upstart jobs

推荐答案

作为部署的一部分运行的以下脚本解决了该问题:

The following script, run as part of the deployment solved the problem:

import time
from celery.app.control import Control
from myapp.tasks import celery # my application's Celery app

if __name__ == "__main__":
    control = Control(celery)
    control.cancel_consumer("celery") # queue name, must probably be specified once per queue, but my app uses a single queue

    inspect = control.inspect()
    while True:
        active = inspect.active()
        running_jobs = []
        for key, value in active.items():
            running_jobs.extend(value)
        if len(running_jobs) > 0:
            print("{} jobs running: {}".format(len(running_jobs), ", ".join(job["name"] for job in running_jobs)))
            time.sleep(10)
        else:
            print("No running jobs")
            break

这篇关于如何告诉芹菜工作者停止接受任务?如何检查芹菜工作者任务是否正在运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆