是否可以扩展Celery,以便将结果存储到多个MongoDB集合中? [英] Is it possible to extend Celery, so results would be store to several MongoDB collections?
问题描述
我已经开始一个新项目,我想让Celery将结果保存到多个MongoDB集合中,而不是一个.有没有办法通过配置来做到这一点,或者我需要扩展Celery和Kombu来实现这一点?
I've started a new project and I want to make Celery save results to several MongoDB collections instead of one. Is there a way to do that through configs or do I need to extend Celery and Kombu to achieve that?
推荐答案
您不需要修改Celery,可以对其进行扩展.这正是我为一个内部项目所做的.我不想触及标准结果后端(在我的情况下为Redis),但也想将任务的状态和结果很好地存储在MongoDB中,同时增强状态/结果.
You don't need to modify Celery, you can extend it. That's exactly what I did for one internal project. I didn't want to touch the standard results backend (Redis in my case), but wanted to also store the tasks' state and results in MongoDB for good while enhancing the state/results at the same time.
我最终创建了一个名为 TaskTracker
的小图书馆,该图书馆使用
I ended up creating a little library with class called TaskTracker
that uses Celery signals machinery to achieve the goal. The key parts of the implementation look like this:
import datetime
from celery import signals, states
from celery.exceptions import ImproperlyConfigured
from pymongo import MongoClient, ReturnDocument
class TaskTracker(object):
"""Track task processing and store the state in MongoDB."""
def __init__(self, app):
self.config = app.conf.get('task_tracker')
if not self.config:
raise ImproperlyConfigured('Task tracker configuration missing')
self.tasks = set()
self._mongo = None
self._connect_signals()
@property
def mongo(self):
# create client on first use to avoid 'MongoClient opened before fork.'
# warning
if not self._mongo:
self._mongo = self._connect_to_mongodb()
return self._mongo
def _connect_to_mongodb(self):
client = MongoClient(self.config['mongodb']['uri'])
# check connection / error handling
# ...
return client
def _connect_signals(self):
signals.task_received.connect(self._on_task_received)
signals.task_prerun.connect(self._on_task_prerun)
signals.task_retry.connect(self._on_task_retry)
signals.task_revoked.connect(self._on_task_revoked)
signals.task_success.connect(self._on_task_success)
signals.task_failure.connect(self._on_task_failure)
def _on_task_received(self, sender, request, **other_kwargs):
if request.name not in self.tasks:
return
collection = self.mongo \
.get_database(self.config['mongodb']['database']) \
.get_collection(self.config['mongodb']['collection'])
collection.find_one_and_update(
{'_id': request.id},
{
'$setOnInsert': {
'name': request.name,
'args': request.args,
'kwargs': request.kwargs,
'date_received': datetime.datetime.utcnow(),
'job_id': request.message.headers.get('job_id')
},
'$set': {
'status': states.RECEIVED,
'root_id': request.root_id,
'parent_id': request.parent_id
},
'$push': {
'status_history': {
'date': datetime.datetime.utcnow(),
'status': states.RECEIVED
}
}
},
upsert=True,
return_document=ReturnDocument.AFTER)
# similarly for other signals...
def _on_task_prerun(self, sender, task_id, task, args, kwargs,
**other_kwargs):
# ...
def _on_task_retry(self, sender, request, reason, einfo, **other_kwargs):
# ...
# ...
def track(self, task):
"""Set up tracking for given task."""
# accept either task name or task instance (for use as a decorator)
if isinstance(task, str):
self.tasks.add(task)
else:
self.tasks.add(task.name)
return task
然后,您需要为MongoDB提供配置.我使用Celery的YAML配置文件,所以它看起来像这样:
Then you need to provide configuration for MongoDB. I use YAML configuration file for Celery so it looks like this:
# standard Celery settings...
# ...
task_tracker:
# MongoDB database for storing task state and results
mongodb:
uri: "\
mongodb://myuser:mypassword@\
mymongo.mydomain.com:27017/?\
replicaSet=myreplica&tls=true&connectTimeoutMS=5000&\
w=1&wtimeoutMS=3000&readPreference=primaryPreferred&maxStalenessSeconds=-1&\
authSource=mydatabase&authMechanism=SCRAM-SHA-1"
database: 'mydatabase'
collection: 'tasks'
在您的任务模块中,您只需创建提供Celery应用程序的类实例并装饰您的任务即可:
In your tasks module, you just create the class instance providing your Celery app and decorate your tasks:
import os
from celery import Celery
import yaml
from celery_common.tracking import TaskTracker # my custom utils library
config_file = os.environ.get('CONFIG_FILE', default='/srv/celery/config.yaml')
with open(config_file) as f:
config = yaml.safe_load(f) or {}
app = Celery(__name__)
app.conf.update(config)
tracker = TaskTracker(app)
@tracker.track
@app.task(name='mytask')
def mytask(myparam1, myparam2, *args, **kwargs):
pass
现在,您的任务状态和结果将在MongoDB中进行跟踪,与标准结果后端分开.如果需要将其存储在多个数据库中,则可以对其进行一些调整,创建多个 TaskTracker
实例,并为任务提供多个装饰器.
Now your tasks' state and results are going to be tracked in MongoDB, separate from the standard results backend. If you need to store it in multiple databases, you can adjust it a bit, create multiple TaskTracker
instances and provide multiple decorators to your tasks.
这篇关于是否可以扩展Celery,以便将结果存储到多个MongoDB集合中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!