如何使芹菜工人从任务中返回结果 [英] How to make Celery worker return results from task

查看:43
本文介绍了如何使芹菜工人从任务中返回结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个烧瓶应用程序调用任务.该任务从数据库中提取数据,绘制折线图并返回在html页面上呈现的html内容.在没有Celery的情况下,Flask应用程序可以正常工作并在客户端显示折线图,但是现在我想委托celery通过 RabbitMQ 代理运行任务,因为它可以运行,因为我可以在Celery shell中看到日志输出,但是生成的html内容永远不会发送回Flask服务器应用程序.该怎么做?

I have a flask app which calls a task. The task pulls data from database, plots line charts and returns html contents which is rendered on an html page. Without Celery the Flask app works fine and renders line chart on client side, but now I want to delegated celery to run task via RabbitMQ broker, which it runs as I can see the log outputs in Celery shell, but the resulting html contents never gets sent back to flask server app. How to do that?

这是对 http:/的跟进/stackoverflow.com/questions/37839551/how-to-restart-flask-process-by-refreshing-page/38146003#38146003 .

#server-celery.py

app = Flask(__name__)

@app.route('/',methods=['GET'])
def index():
   return render_template("index.html")

@app.route('/', methods=['GET', 'POST'])
def plotdata():
   form = InputForm(request.form)
   if request.method == 'POST' and form.validate():
       lineChart = task.main.delay(form.startdate.data, form.enddate.data) 
       return render_template('view.html', form=form, result= lineChart)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

并且该任务以 Celery 工作者的身份运行:

And the task running as Celery worker:

#task.py
from celery import Celery

broker = 'amqp://localhost:5672'
app = Celery(__name__, broker=broker)

def queryDatabase(param1, param2):
   #query database, return results
   return data

def plotLineChart(data):
   #plot data as line Chart
   return LineChartHtmlcontent

@app.task
def main(param1,param2):
    data = queryDatabase(param1,param2)
    return plotLineChart(data)

和客户端html页面:

and client html page:

<!--view.html-->

<form method=post action="">
<table>
  {% for field in form %}
    <tr>
    <td>{{ field.label }}</td>
        <td>{{ field }}</td>

    </tr>
  {% endfor %}
</table>
<p><input type=submit value=Submit></form></p>

<p>
{% if result != None %}
<img src="{{ result }}" width="500">
{% endif %}
</p>

推荐答案

更好的解决方案是让任务像预期的那样使用celery异步运行,并使用页面上的javascript定期轮询celery任务,查看状态.

A better solution is simply to let the task run asynchronously using celery like it was intended to be used and use javascript on the page to poll the celery task periodically to see the status.

首先,当您创建芹菜任务时,请使用bind = True参数.这使您可以将 self 传递给函数.

First, when you create your celery task use the bind=True parameter. This allows you to pass self into the function.

@app.task(bind=True)
def main(self, param1, param2):
    self.update_state(state='PENDING')
    data = queryDatabase(param1,param2)
    self.update_state(state='COMPLETE')
    return plotLineChart(data)

您可以看到我们现在可以更新任务的状态.您还可以修改代码以允许元数据进入 update_state 方法,例如: self.update_state(state ='PROGRESS',meta = {'current':0,'total':12})

You can see that we can now update the state of the task. You can modify the code to allow metadata into the update_state method as well, for example: self.update_state(state='PROGRESS', meta={'current': 0, 'total': 12})

现在,您需要轮询celery任务以查看其运行情况.

Now you need to poll the celery task to see how it's doing.

@blueprint.route('/status/<task_id>', methods=['GET'])
def taskstatus(task_id=None):
    task = main.AsyncResult(task_id)
    response = {
        'state': task.state,
    }
    return jsonify(response)

当您使用任务ID轮询URL时,这将返回json对象.调用 .delay()

This will return a json object when you poll the URL with the task id. You can get the task id by using lineChart.id after you call .delay()

请参阅miguel grinberg撰写的出色教程: http://blog.miguelgrinberg.com/post/using-celery-with-flask

Please see the excellent tutorial by miguel grinberg: http://blog.miguelgrinberg.com/post/using-celery-with-flask

这篇关于如何使芹菜工人从任务中返回结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆