是否可以扩展Celery,以便将结果存储到多个MongoDB集合中? [英] Is it possible to extend Celery, so results would be store to several MongoDB collections?

查看:33
本文介绍了是否可以扩展Celery,以便将结果存储到多个MongoDB集合中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始一个新项目,我想让Celery将结果保存到多个MongoDB集合中,而不是一个.有没有办法通过配置来做到这一点,或者我需要扩展Celery和Kombu来实现这一点?

I've started a new project and I want to make Celery save results to several MongoDB collections instead of one. Is there a way to do that through configs or do I need to extend Celery and Kombu to achieve that?

推荐答案

您不需要修改Celery,可以对其进行扩展.这正是我为一个内部项目所做的.我不想触及标准结果后端(在我的情况下为Redis),但也想将任务的状态和结果很好地存储在MongoDB中,同时增强状态/结果.

You don't need to modify Celery, you can extend it. That's exactly what I did for one internal project. I didn't want to touch the standard results backend (Redis in my case), but wanted to also store the tasks' state and results in MongoDB for good while enhancing the state/results at the same time.

我最终创建了一个名为 TaskTracker 的小图书馆,该图书馆使用

I ended up creating a little library with class called TaskTracker that uses Celery signals machinery to achieve the goal. The key parts of the implementation look like this:

import datetime

from celery import signals, states
from celery.exceptions import ImproperlyConfigured
from pymongo import MongoClient, ReturnDocument

class TaskTracker(object):
    """Track task processing and store the state in MongoDB."""

    def __init__(self, app):
        self.config = app.conf.get('task_tracker')
        if not self.config:
            raise ImproperlyConfigured('Task tracker configuration missing')
        self.tasks = set()
        self._mongo = None

        self._connect_signals()

    @property
    def mongo(self):
        # create client on first use to avoid 'MongoClient opened before fork.'
        # warning
        if not self._mongo:
            self._mongo = self._connect_to_mongodb()
        return self._mongo

    def _connect_to_mongodb(self):
        client = MongoClient(self.config['mongodb']['uri'])
        # check connection / error handling
        # ...
        return client

    def _connect_signals(self):
        signals.task_received.connect(self._on_task_received)
        signals.task_prerun.connect(self._on_task_prerun)
        signals.task_retry.connect(self._on_task_retry)
        signals.task_revoked.connect(self._on_task_revoked)
        signals.task_success.connect(self._on_task_success)
        signals.task_failure.connect(self._on_task_failure)

    def _on_task_received(self, sender, request, **other_kwargs):
        if request.name not in self.tasks:
            return

        collection = self.mongo \
            .get_database(self.config['mongodb']['database']) \
            .get_collection(self.config['mongodb']['collection'])
        collection.find_one_and_update(
            {'_id': request.id},
            {
                '$setOnInsert': {
                    'name': request.name,
                    'args': request.args,
                    'kwargs': request.kwargs,
                    'date_received': datetime.datetime.utcnow(),
                    'job_id': request.message.headers.get('job_id')
                },
                '$set': {
                    'status': states.RECEIVED,
                    'root_id': request.root_id,
                    'parent_id': request.parent_id
                },
                '$push': {
                    'status_history': {
                        'date': datetime.datetime.utcnow(),
                        'status': states.RECEIVED
                    }
                }
            },
            upsert=True,
            return_document=ReturnDocument.AFTER)

    # similarly for other signals...
    def _on_task_prerun(self, sender, task_id, task, args, kwargs,
                        **other_kwargs):
        # ...

    def _on_task_retry(self, sender, request, reason, einfo, **other_kwargs):
        # ...

    # ...

    def track(self, task):
        """Set up tracking for given task."""
        # accept either task name or task instance (for use as a decorator)
        if isinstance(task, str):
            self.tasks.add(task)
        else:
            self.tasks.add(task.name)
            return task

然后,您需要为MongoDB提供配置.我使用Celery的YAML配置文件,所以它看起来像这样:

Then you need to provide configuration for MongoDB. I use YAML configuration file for Celery so it looks like this:

# standard Celery settings...
# ...

task_tracker:
    # MongoDB database for storing task state and results
    mongodb:
        uri: "\
            mongodb://myuser:mypassword@\
            mymongo.mydomain.com:27017/?\
            replicaSet=myreplica&tls=true&connectTimeoutMS=5000&\
            w=1&wtimeoutMS=3000&readPreference=primaryPreferred&maxStalenessSeconds=-1&\
            authSource=mydatabase&authMechanism=SCRAM-SHA-1"
        database: 'mydatabase'
        collection: 'tasks'

在您的任务模块中,您只需创建提供Celery应用程序的类实例并装饰您的任务即可:

In your tasks module, you just create the class instance providing your Celery app and decorate your tasks:

import os

from celery import Celery
import yaml

from celery_common.tracking import TaskTracker  # my custom utils library


config_file = os.environ.get('CONFIG_FILE', default='/srv/celery/config.yaml')
with open(config_file) as f:
    config = yaml.safe_load(f) or {}

app = Celery(__name__)
app.conf.update(config)

tracker = TaskTracker(app)

@tracker.track
@app.task(name='mytask')
def mytask(myparam1, myparam2, *args, **kwargs):
    pass

现在,您的任务状态和结果将在MongoDB中进行跟踪,与标准结果后端分开.如果需要将其存储在多个数据库中,则可以对其进行一些调整,创建多个 TaskTracker 实例,并为任务提供多个装饰器.

Now your tasks' state and results are going to be tracked in MongoDB, separate from the standard results backend. If you need to store it in multiple databases, you can adjust it a bit, create multiple TaskTracker instances and provide multiple decorators to your tasks.

这篇关于是否可以扩展Celery,以便将结果存储到多个MongoDB集合中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆