暂停芹菜任务 [英] Pause celery task

查看:54
本文介绍了暂停芹菜任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试通过单击用户按钮来暂停芹菜任务.

I'm trying to pause a celery task temporary based on user button click.

我所做的是:

用户单击按钮时;我发布了一个AJAX请求,将我的芹菜任务状态更新为"PAUSE"

When a user clicks a button; I release an AJAX request that updates my celery task state to "PAUSE"

然后;我的策略是当我开始做一项芹菜任务时;它运行一个for循环.每个for循环;我读取了数据库的状态",然后查看是否将其设置为暂停":如果设置为暂停";我想将其休眠60秒钟,或者直到用户单击恢复"按钮时才休眠.相同的想法.

Then; my tactic was to; when I initate a task into celery; it runs a for loop. Every for loop; I read my database 'state' and see if it's set to PAUSE: if it is set to pause; I want to sleep it for 60 seconds or sleep it until user hits resume button; same idea.

这是我的代码:

r  = redis.StrictRedis(host='localhost', port=6379, db=0)

@celery.task(bind=True)
def runTask(self, arr)

   for items in arr:
      current_task_id = self.request.id
      item = r.get('celery-task-meta-'+current_task_id)
      load_as_json = json.loads(item)
      if "PAUSE" in load_as_json['status']:
        sleep(50) 




@app.route('/start')
def start_task()
   runTask.apply_async(args=[arr])
   return 'task started running

这是我的暂停API端点的样子:

Here is how my pause API endpoint looks like:

@app.route('/stop/<task_id>')
def updateTaskState():
  task_id = request.cookie.get('task_id')
  loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
  loadAsJson['status'] = 'PAUSE'
  loadAsJson.update(loadAsJson)
  dump_as_json = json.dumps(loadAsJson)
  updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
  return 'updated state';

根据我的概念了解;是因为我看不到更新状态是因为;该任务已经执行,无法从数据库中检索更新的值.仅供参考:任务更新状态立即设置为暂停";我通过创建一个单独的脚本来检查这一点,该脚本检查while循环内的状态;每次我单击释放AJAX请求的按钮以更新状态;我的数据库被更新,并且在单独的脚本上显示"PAUSE";但是在@ celery.task装饰器中,我似乎无法获取更新状态.

From what I conceptually understand; is that the reason why I'm not seeing an updated state is because; the task is already executed and isnt able to retrieve updated values from database. FYI: The task update state is set to PAUSE immediately; I checked this by creating a seperate script that checks state within while loop; everytime I click the button that release AJAX request to update the state; my db gets updated and it reads "PAUSE" on the seperate script; however within the @celery.task decorator I can't seem to get the updated state.

下面是我用来测试的单独脚本;似乎处于预期的更新状态;我只是无法在任务装饰器中获取更新的状态……很奇怪.

Below is my seperate script I used to test; and it seems to be updatign state as expected; I just can't get the updated state within task decorator... weirdly.

r  = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
    response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
    loadAsJson = json.loads(response)
    print loadAsJson['status']

推荐答案

面对相同的问题,没有好的答案,我想出了您可能想要的解决方案,它不依赖于您正在使用的消息队列(aka Redis或RabbitMQ).对我来说,关键是celery.app.task.Task类中的update_state方法将task_id作为可选参数.就我而言,我正在通过多个工作程序节点运行长时间运行的文件复制和校验和任务,有时用户希望暂停一项正在运行的任务,以降低对存储的性能要求,以允许其他任务先完成.我还正在运行无状态的Flask REST API,以启动后端任务并检索正在运行的任务的状态,因此我需要一种方法来调用API来暂停和恢复任务.

Faced with the same question and no good answers I came up with solution you might like and it is not dependent on the message queue you are using (aka Redis or RabbitMQ). The key for me was that the update_state method in the celery.app.task.Task class takes task_id as an optional parameter. In my case I am running long running file copy and checksum tasks through multiple worker nodes and sometimes the user wants to pause one running task to reduce performance requirements on the storage to allow other tasks to finish first. I am also running a stateless Flask REST API to initiate the backend tasks and retrieve status of running tasks so I needed a way to have an API call come in to pause and resume the tasks.

这是我的测试功能,可以通过监视其自身状态来接收消息"以暂停自身:

Here is my test function which can receive a "message" to pause itself by monitoring it's own state:

celery.task(bind=True)
def long_test(self, i):
    print('long test starting with delay of ' + str(i) + 'seconds on each loop')
    print('task_id =' + str(self.request.id))
    self.update_state(state='PROCESSING')
    count = 0
    while True:
        task = celery.AsyncResult(self.request.id)
        while task.state == 'PAUSING' or task.state == 'PAUSED':
            if task.state == 'PAUSING':
                self.update_state(state='PAUSED')
            time.sleep(i)
        if task.state == 'RESUME':
            self.update_state(state='PROCESSING')
        print('long test loop ' + str(count) + ' ' + str(task.state))
        count += 1
        time.sleep(i)

然后,为了暂停或继续,我可以执行以下操作:

Then, in order to pause or resume I can do the following:

>>> from project.celeryworker.tasks import long_test
>>> from project import create_app, make_celery
>>> flaskapp = create_app()
>>> celery = make_celery(flaskapp)
>>> from celery.app.task import Task
>>> long_test.apply_async(kwargs={'i': 5})
<AsyncResult: bf19d50f-cf04-47f0-a069-6545fb253887>
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='RESUME')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PROCESSING'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'

这篇关于暂停芹菜任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆