使用Celery初始化带有参数的工作程序 [英] Initializing a worker with arguments using Celery

查看:69
本文介绍了使用Celery初始化带有参数的工作程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在寻找似乎对我来说相对简单的东西时遇到了问题.

I'm having issues finding something that seems like it would be relatively simple to me.

我正在将Celery 3.1与Python 3配合使用,并希望使用参数初始化我的工作人员,以便他们可以使用这些详细信息进行设置.

I'm using Celery 3.1 with Python 3 and am wanting to initialize my workers with arguments so that they can use these details for setup.

具体来说:这些工作人员将消耗大量任务,这些任务需要使用身份验证凭据与第三方API进行交互.工作人员必须在使用任何任务之前将身份验证详细信息传递给API服务器(身份验证详细信息在第一个身份验证请求之后存储在cookie中).

In specific: These workers will be consuming tasks which require interacting with a third-party API using authentication credentials. It's necessary for the worker to pass the authentication details to API server prior to consuming any tasks (authentication details are stored in cookies after the first authentication request).

从CLI启动该登录凭证时,我想将这些登录凭证传递给该工作凭证.然后,我希望工作人员使用它们进行身份验证并存储会话以供将来使用将来的任务时使用(理想情况下,该会话将存储在可以从任务访问的属性中).

I would like to pass these login credentials to the worker when it is started from the CLI. I would then like the worker to authenticate using them and store the session for use when consuming future tasks (ideally this would be stored in an attribute that can be accessed from tasks).

芹菜有可能吗?

作为旁注,我考虑过将 requests.session 对象(来自Python requests 库)作为任务参数传递,但这需要序列化,看起来像被皱眉了.

As a side note, I have considered passing a requests.session object (from the Python requests library) as a task argument but that would require serializing which looks like is frowned upon.

推荐答案

我建议使用抽象任务基类并缓存 requests.session .

I would suggest using an abstract task base class and caching the requests.session.

来自Celery文档:

From the Celery docs:

不是为每个请求实例化一个任务,而是在任务注册表中将其注册为全局实例.

A task is not instantiated for every request, but is registered in the task registry as a global instance.

这意味着__init__构造函数在每个进程中仅被调用一次,并且任务类在语义上更接近Actor.

This means that the __init__ constructor will only be called once per process, and that the task class is semantically closer to an Actor.

这对于缓存资源也很有用...

This can also be useful to cache resources...

import requests
from celery import Task

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

    def __init__(self):
        # since this class is instantiated once, use this method
        # to initialize and cache resources like a requests.session
        # or use a property like the example below which will create
        # a requests.session only the first time it's accessed

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            session.auth = ('user', 'pass')

            self._session = session

        return self._session

现在,当您创建将发出API请求的任务时:

Now when you create the tasks that will make API requests:

@app.task(base=APITask, bind=True)
def call_api(self, url):
    # self will refer to the task instance (because we're using bind=True)
    self.session.get(url)

您还可以使用 app.task 装饰器作为额外的参数传递API身份验证选项,该参数将在任务的 __ dict __ 上设置,例如:

Also you can pass the API authentication options using the app.task decorator as an extra argument which will be set on the __dict__ of the task, for example:

# pass a custom auth argument
@app.task(base=APITask, bind=True, auth=('user', 'pass'))
def call_api(self, url):
    pass

并使基类使用传递的身份验证选项:

And make the base class use the passed authentication options:

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

   # the API authentication
   auth = ()

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            # use the authentication that was passed to the task
            session.auth = self.auth

            self._session = session

        return self._session

您可以在Celery文档网站上阅读更多信息:

You can read more on the Celery docs site:

现在回到您的原始问题,该问题将从命令行将额外的参数传递给工作程序:

Now back to your original question which is passing extra arguments to the worker from the command line:

Celery文档添加新的命令行选项,下面是从命令行向工作人员传递用户名和密码的示例:

There is a section about this in the Celery docs Adding new command-line options, here's an example of passing a username and a password to the worker from the command line:

$ celery worker -A appname --username user --password pass

代码:

from celery import bootsteps
from celery.bin import Option


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.')
)

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.')
)


class CustomArgs(bootsteps.Step):

    def __init__(self, worker, api_username, api_password, **options):
        # store the api authentication
        APITask.auth = (api_username, api_password)


app.steps['worker'].add(CustomArgs)

这篇关于使用Celery初始化带有参数的工作程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆