Celery Task Custom跟踪方法 [英] Celery Task Custom tracking method

查看:56
本文介绍了Celery Task Custom跟踪方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的主要问题取决于我需要知道某个任务是否仍在排队,已开始或已被撤销的事实.

My main problem relies on the fact that i need to know if a task is still queued, started or revoked.

我无法使用芹菜和Redis进行此操作,因为结果在Redis中被删除24小时.

I cant do this with celery and redis because 24hs after the results are in redis they are deleted.

我有一些想法,但我认为最可靠的想法是让数据库跟踪并手动添加用户正在运行的任务所需的关键信息.

I had some ideas but i think the most solid one is to have a database tracking and manually adding the critical information that i need of the task a user is running.

有一些方法可以在任务开始之前运行,并且当我创建任务或撤消它们时,我还可以手动使用数据库?我不会为每个任务创建一个新行,而是为每个用户更新一个行,因为我只对每个用户的最后一个任务感兴趣.

There are methods for that that can run before a task start and i can also manually work with the database when i create task or revoke them right? i wont create a new row for every task but instead just updating a row for every user because im only interested in the last task of every user.

推荐答案

您可能必须组合多种方法.如果您的结果在后端过期(这是合理的),则必须采用不同的存储方式(例如数据库)来长期归档任务状态.首先,您可以启用 task_track_started ,以便当工作人员开始执行时任务报告 STARTED 状态.然后定期检查结果后端,以查看未处于就绪状态的任务的状态更新( SUCCESS FAILURE REVOKED ).如果它们处于最终状态,请使用 forget()方法从后端删除结果.

You probably have to combine multiple approaches. If your results expire in the backend (which is reasonable), you have to employ different storage like a database for long-term archiving of tasks' states. For a start, you can enable task_track_started so that tasks report STARTED status when worker starts the execution). Then periodically check the results backend for status updates of tasks that are not in ready states (SUCCESS, FAILURE and REVOKED). If they are in the final state, remove the result from the backend using forget() method.

唯一的问题是已撤销的任务.如果没有可用的工作程序,则撤消任务无效(这就是为什么在调用撤消时应始终等待答复)的原因.如果工作人员很忙,因此该任务仍留在消息队列中,则工作人员只是注意到,当他们从队列中拿起该任务时,应将其丢弃,但它仅以工作人员的状态存储.一旦接受,他们将放弃任务,结果最终将包含 REVOKED 状态.关键是要注意,撤消的任务仅在工人的状态下保持,因此您应该使用

The only problem is with revoked tasks. If there are no workers available, revoking the task has no effect (that's why you should always wait for reply when calling a revoke). If the workers are busy and thus the task remains in the message queue, workers just note that such task should be discarded when they pick it up from the queue, but it's stored only in the worker's state. Once they take it, they drop the task and result contains REVOKED status eventually. The key is to note that revoked tasks are only maintained in the workers' state and so you should use --statedb parameter to persist the state in case the worker crashes. Otherwise, already revoked tasks will happily get processed by the same or another worker.

您最好的选择是调用revoke命令,如果您收到工作人员的答复,请将数据库中任务的内部状态设置为类似 FLAGGED_REVOKED 的内容.在状态更新循环中,仅当其不是 PENDING 时才更新已撤销任务的状态.

Your best option I guess is to call the revoke command and if you get a reply from workers, set internal status of the task in your database to something like FLAGGED_REVOKED. In the status update loop, update the revoked task's status only if it's not PENDING.

我有一个简单的作业调度应用程序,它使用APScheduler作为调度程序,并使用Celery作为执行层.有关作业,作业运行和计划的信息保存在MongoDB中.这是我用来取消工作的代码:

I have a simple job scheduling app that employs APScheduler as a scheduler and Celery as a execution layer. Information about the jobs, job runs and the schedule are kept in MongoDB. Here is a the code I use to cancel the job:

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
    reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
    if reply:
        collection.update_one({'_id': run['_id']},
                              {'$set': {'task_state': 'FLAGGED_REVOKED'}})
    else:
        raise Exception('Failed to revoke the task (no reply received)')
else:
    raise Exception('Job execution cannot be canceled')

这是我的状态更新代码(作为内部APScheduler作业保持运行,每隔几秒钟运行一次):

This is my status update code (that's kept as an internal APScheduler job to run every few seconds):

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

runs = collection.find({
    'task_id': {'$exists': True},
    'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
    result = AsyncResult(run['task_id'],
                         backend=celery.backend, app=celery)
    if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
        update = {'task_state': 'FLAGGED_REVOKED'}
    else:
        update = {'task_state': result.state}
    if result.state == 'FAILURE':
        update['exception'] = str(result.result)
        update['traceback'] = result.traceback
    elif result.state == 'SUCCESS':
        update['result'] = result.result
    if result.date_done:
        date_done = dateparser.parse(result.date_done) \
            if isinstance(result.date_done, str) else result.date_done
        update['finish_time'] = date_done
    try:
        collection.update_one({'_id': run['_id']}, {'$set': update})
    except Exception as e:
        print('Failed to update task status: %s', str(e))
    else:
        if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
            result.forget()

这篇关于Celery Task Custom跟踪方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆