在运行我的任务之前,如何设置Celery调用自定义初始化函数? [英] How can I set up Celery to call a custom initialization function before running my tasks?

查看:677
本文介绍了在运行我的任务之前,如何设置Celery调用自定义初始化函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Django项目,我试图使用Celery提交后台处理任务( http://ask.github.com/celery/introduction.html )。 Celery与Django集成很好,我已经能够提交自定义任务并获取结果。



唯一的问题是我找不到合适的方式在守护程序进程中执行自定义初始化。在开始处理任务之前,我需要调用一个昂贵的函数来加载大量内存,而且每次都无法调用该函数。



有没有人以前有这个问题吗?任何想法如何解决,而不修改Celery源代码?



谢谢

解决方案

您可以编写自定义加载器,也可以使用信号。



加载器具有 on_task_init 方法,当任务即将被执行时调用,
on_worker_init 由芹菜+ celerybeat主进程调用。



使用信号可能是最简单的,可用的信号是:



0.8.x:




  • task_prerun(task_id,task,args,kwargs)



    在任务即将由工作人员执行时分派(或本地使用
    ,如果使用 apply /或如果 CELERY_ALWAYS_EAGER 已设置)。


  • task_postrun(task_id,task,args,kwargs,retval)
    任务已经在与上述相同的条件下执行。


  • task_sent(task_id,task,args,kwargs,eta,taskset) / code>



    应用任务时调用(不适用于长时间运行的操作)




    • 0.9.x中的附加信号(github上的当前主分支):




      • worker_init()



        当celeryd启动时调用(在任务初始化之前,
        系统支持 fork ,任何内存更改将被复制到子
        工作进程)。


      • worker_ready()



        当celeryd能够接收任务时调用。


      • worker_shutdown()



        当celeryd关闭时调用




      这里是一个例子precalculati在进程中第一次执行任务时,会发生某些事情:

       从celery.task导入任务
      来自芹菜。注册表导入任务
      from celery.signals import task_prerun

      _precalc_table = {}

      class PowersOfTwo(Task):

      def run self,x):
      如果x在_precalc_table中:
      返回_precalc_table [x]
      else:
      返回x ** 2
      tasks.register(PowersOfTwo)


      def _precalc_numbers(** kwargs):
      如果不是_precalc_table:#它是空的,所以还没有生成
      对于我在范围(1024):
      _precalc_table [i] = i ** 2


      #需要使用注册的实例作为发件人参数。
      task_prerun.connect(_precalc_numbers,sender = tasks [PowerOfTwo.name])

      如果你想要为所有任务运行该功能,只需跳过发件人参数。


      I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

      The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

      Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

      Thanks

      解决方案

      You can either write a custom loader, or use the signals.

      Loaders have the on_task_init method, which is called when a task is about to be executed, and on_worker_init which is called by the celery+celerybeat main process.

      Using signals is probably the easiest, the signals available are:

      0.8.x:

      • task_prerun(task_id, task, args, kwargs)

        Dispatched when a task is about to be executed by the worker (or locally if using apply/or if CELERY_ALWAYS_EAGER has been set).

      • task_postrun(task_id, task, args, kwargs, retval) Dispatched after a task has been executed in the same conditions as above.

      • task_sent(task_id, task, args, kwargs, eta, taskset)

        Called when a task is applied (not good for long-running operations)

      Additional signals available in 0.9.x (current master branch on github):

      • worker_init()

        Called when celeryd has started (before the task is initialized, so if on a system supporting fork, any memory changes would be copied to the child worker processes).

      • worker_ready()

        Called when celeryd is able to receive tasks.

      • worker_shutdown()

        Called when celeryd is shutting down.

      Here's an example precalculating something the first time a task is run in the process:

      from celery.task import Task
      from celery.registry import tasks
      from celery.signals import task_prerun
      
      _precalc_table = {}
      
      class PowersOfTwo(Task):
      
          def run(self, x):
              if x in _precalc_table:
                  return _precalc_table[x]
              else:
                  return x ** 2
      tasks.register(PowersOfTwo)
      
      
      def _precalc_numbers(**kwargs):
          if not _precalc_table: # it's empty, so haven't been generated yet
              for i in range(1024):
                  _precalc_table[i] = i ** 2
      
      
      # need to use registered instance for sender argument.
      task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
      

      If you want the function to be run for all tasks, just skip the sender argument.

      这篇关于在运行我的任务之前,如何设置Celery调用自定义初始化函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆