如何设置 Celery 在运行我的任务之前调用自定义初始化函数? [英] How can I set up Celery to call a custom initialization function before running my tasks?

查看:19
本文介绍了如何设置 Celery 在运行我的任务之前调用自定义初始化函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Django 项目,我正在尝试使用 Celery 提交任务进行后台处理(http://ask.github.com/celery/introduction.html).Celery 与 Django 集成良好,我已经能够提交我的自定义任务并返回结果.

I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

唯一的问题是我找不到在守护进程中执行自定义初始化的理智方法.在开始处理任务之前,我需要调用一个加载大量内存的昂贵函数,我无法承受每次调用该函数的代价.

The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

有人遇到过这个问题吗?任何想法如何在不修改 Celery 源代码的情况下解决它?

Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

谢谢

推荐答案

您可以编写自定义加载器,也可以使用信号.

You can either write a custom loader, or use the signals.

Loaders 有 on_task_init 方法,在任务即将执行时调用,和 on_worker_init 由 celery+celerybeat 主进程调用.

Loaders have the on_task_init method, which is called when a task is about to be executed, and on_worker_init which is called by the celery+celerybeat main process.

使用信号可能是最简单的,可用的信号有:

Using signals is probably the easiest, the signals available are:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

当一个任务即将被worker(或本地)执行时调度如果使用 apply/或如果 CELERY_ALWAYS_EAGER 已设置).

Dispatched when a task is about to be executed by the worker (or locally if using apply/or if CELERY_ALWAYS_EAGER has been set).

task_postrun(task_id, task, args, kwargs, retval)在与上述相同的条件下执行任务后分派.

task_postrun(task_id, task, args, kwargs, retval) Dispatched after a task has been executed in the same conditions as above.

task_sent(task_id, task, args, kwargs, eta, taskset)

在应用任务时调用(不适合长时间运行的操作)

Called when a task is applied (not good for long-running operations)

0.9.x 中可用的其他信号(github 上的当前主分支):

Additional signals available in 0.9.x (current master branch on github):

  • worker_init()

在 celeryd 启动时调用(在任务初始化之前,所以如果在系统支持fork,任何内存变化都会复制到子进程工作进程).

Called when celeryd has started (before the task is initialized, so if on a system supporting fork, any memory changes would be copied to the child worker processes).

worker_ready()

在 celeryd 能够接收任务时调用.

Called when celeryd is able to receive tasks.

worker_shutdown()

在 celeryd 关闭时调用.

Called when celeryd is shutting down.

以下是在流程中第一次运行任务时预先计算某些内容的示例:

Here's an example precalculating something the first time a task is run in the process:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

如果您希望为所有任务运行该函数,只需跳过 sender 参数.

If you want the function to be run for all tasks, just skip the sender argument.

这篇关于如何设置 Celery 在运行我的任务之前调用自定义初始化函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆