初始化具有不同值的不同芹菜工人 [英] Initializing Different Celery Workers with Different Values

查看:57
本文介绍了初始化具有不同值的不同芹菜工人的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用celery在Hadoop上运行长时间运行的任务。每个任务都会在Hadoop上执行一个Pig脚本,该脚本运行大约30分钟-2小时。

I am using celery to run long running tasks on Hadoop. Each task executes a Pig script on Hadoop which runs for about 30 mins - 2 hours.

我当前的Hadoop设置有4个队列a,b,c和默认队列。所有任务当前都由一个将作业提交到单个队列的工作人员执行。

My current Hadoop setup has 4 queues a,b,c, and default. All tasks are currently being executed by a single worker which submits the job to a single queue.

我想再添加3个将作业提交到其他队列的工作人员,一个工作人员每个队列。

I want to add 3 more workers which submit jobs to other queues, one worker per queue.

问题是队列当前是硬编码的,我希望为每个工作人员设置此变量。

The problem is the queue is currently hard-coded and I wish to make this variable per worker.

我进行了很多搜索,但无法找到一种方法来传递每个芹菜工作者不同的队列值并在任务中访问它。

I searched a lot but I am unable to find a way to pass each celery worker a different queue value and access it in my task.

我像这样启动我的芹菜工作者

I start my celery worker like so.

celery -A app.celery worker

我希望在命令行中传递一些其他参数,并在我的任务中访问它,但是celery抱怨它不理解我的自定义参数。

I wish to pass some additional arguments in the command-line itself and access it in my task but celery complains that it doesn't understand my custom argument.

我计划通过设置-concurrency = 3 参数在同一主机上运行所有工作程序。有什么解决办法吗?

I plan to run all the workers on the same host by setting the --concurrency=3 parameter. Is there any solution to this problem?

谢谢!

编辑

当前场景是这样的。每当我尝试通过说 tasks.print_something.delay()来执行任务print_something时,它只会打印队列C。

The current scenario is like this. Every I try to execute the task print_something by saying tasks.print_something.delay() it only prints queue C.

@celery.task()
def print_something():
    print "C"

我需要让工人根据我在开始工作时传递给他们的价值来印刷可变字母。

I need to have the workers print a variable letter based on what value I pass to them while starting them.

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"


推荐答案

希望这对某人有帮助。

需要解决多个问题。

第一步是在芹菜中添加对自定义参数的支持。如果不这样做,芹菜会抱怨它不理解该参数。

The first step involved adding support in celery for the custom parameter. If this is not done, celery will complain that it doesn't understand the parameter.

由于我用Flask运行芹菜,所以我像这样初始化芹菜。

Since I am running celery with Flask, I initialize celery like so.

def configure_celery():
    app.config.update(
        CELERY_BROKER_URL='amqp://:@localhost:5672',
        RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
    )
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

我将此函数称为Initializ芹菜并将其存储在名为芹菜的变量中。

I call this function to initialize celery and store it in a variable called celery.

celery = configure_celery()

要添加自定义参数,您需要执行以下操作。

To add the custom parameter you need to do the following.

def add_hadoop_queue_argument_to_worker(parser):
    parser.add_argument(
        '--hadoop-queue', help='Hadoop queue to be used by the worker'
    )

下面使用的芹菜是我们从上述步骤中获得的芹菜。

The celery used below is the one we obtained from above steps.

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)

下一步是使该参数在工作程序中可访问。为此,请按照以下步骤操作。

The next step would be to make this argument accessible in the worker. To do that follow these steps.

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

    def __init__(self, worker, **kwargs):
        worker.app.hadoop_queue = kwargs['hadoop_queue']

通知celery使用此类创建工人。

Inform celery to use this class for creating the workers.

celery.steps['worker'].add(HadoopCustomWorkerStep)

任务现在应该能够访问变量。

The tasks should now be able to access the variables.

@app.task(bind=True)
def print_hadoop_queue_from_config(self):
    print self.app.hadoop_queue

通过在命令行上运行worker进行验证。

Verify it by running the worker on the command-line.

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h

这篇关于初始化具有不同值的不同芹菜工人的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆