烧瓶中的后台任务 [英] Background tasks in flask

查看:126
本文介绍了烧瓶中的后台任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个Web应用程序,它将做一些繁重的工作.考虑到这一点,我考虑将这些任务作为后台任务(非阻止),以便其他请求不会被先前的请求阻止.

I am writing a web application which would do some heavy work. With that in mind i thought of making the tasks as background tasks(non blocking) so that other requests are not blocked by the previous ones.

我继续妖魔化线程,以便一旦主线程结束就不会退出(因为我正在使用 threaded = True ),现在,如果用户发送请求,我的代码将立即告诉您他们正在处理他们的请求(它将在后台运行),并且该应用程序已准备就绪,可以处理其他请求.我当前的应用程序代码如下所示.

I went with demonizing the thread so that it doesn't exit once the main thread (since i am using threaded=True) is finished, Now if user sends a requests my code will immediately tell them that their request is in progress(it'll be running in the background) and the application is ready to serve other requests. My current application code looks something like this.

from flask import Flask
from flask import request
import threading

class threadClass:

    def __init__(self):
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True                       # Daemonize thread
        thread.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = threadClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

我只希望它能够处理一些并发请求(不会在生产中使用)

I just want it to be able to handle a few concurrent requests (it's not gonna be used in production)

我能做得更好吗?我想念什么吗?我正在查看python的多线程包,发现了这个

Could i have done this better? Did i miss anything? I was going through python's multi-threading package and found this

multiprocessing是一个程序包,它支持使用以下命令生成程序API与线程模块相似.多处理包提供本地和远程并发,有效地避免了通过使用子进程而不是线程来使用全局解释器锁.因此,多处理模块使程序员能够完全在给定的计算机上利用多个处理器.它可以在两个Unix上运行和Windows.

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

我可以使用多处理进程来妖魔化一个进程吗?与线程模块相比,我如何能取得更好的成绩?

Can i demonize a process using multi-processing? How can i achieve better than what i have with threading module?

好的,所以我经历了python的多处理程序包,它类似于线程处理.

okay so i went through the multi-processing package of python, it is similar to threading.

from flask import Flask
from flask import request
from multiprocessing import Process

class processClass:

    def __init__(self):
        p = Process(target=self.run, args=())
        p.daemon = True                       # Daemonize it
        p.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = processClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

以上方法看起来不错吗?

Does the above approach looks good?

推荐答案

最佳实践

在Flask中实现后台任务的最佳方法是使用Celery,如此SO帖子中所述.正式的烧瓶文档 Celery文档.

正如@MrLeeh在评论中指出的那样,Miguel Grinberg在他的 Pycon 2016演讲中提出了解决方案通过实现装饰器.我想强调,我对他的解决方案表示最高的敬意;他称其为疯狂的解决方案",他自己.以下代码是他的解决方案的次要改编.

As @MrLeeh pointed out in a comment, Miguel Grinberg presented a solution in his Pycon 2016 talk by implementing a decorator. I want to emphasize that I have the highest respect for his solution; he called it a "crazy solution" himself. The below code is a minor adaptation of his solution.

请勿在生产环境中使用它!主要原因是此应用通过使用全局 tasks 词典出现了内存泄漏.即使您解决了内存泄漏问题,维护此类代码也很困难.如果您只是想在私人项目中玩转或使用它,请继续阅读.

Don't use this in production! The main reason is that this app has a memory leak by using the global tasks dictionary. Even if you fix the memory leak issue, maintaining this sort of code is hard. If you just want to play around or use this in a private project, read on.

假设您在/foo 端点中有一个长期运行的函数调用.我用10秒的 sleep 计时器来模拟.如果您三次调用enpoint,则需要30秒才能完成.

Assume you have a long running function call in your /foo endpoint. I mock this with a 10 second sleep timer. If you call the enpoint three times, it will take 30 seconds to finish.

Miguel Grinbergs装饰器解决方案在 flask_async 中实现.它在Flask上下文中运行与当前Flask上下文相同的新线程.每个线程都会获得一个新的 task_id .结果保存在全局字典 tasks [task_id] ['result'] 中.

Miguel Grinbergs decorator solution is implemented in flask_async. It runs a new thread in a Flask context which is identical to the current Flask context. Each thread is issued a new task_id. The result is saved in a global dictionary tasks[task_id]['result'].

装饰器就位后,您只需要用 @flask_async 装饰端点,并且端点是异步的-就是这样!

With the decorator in place you only need to decorate the endpoint with @flask_async and the endpoint is asynchronous - just like that!

import threading
import time
import uuid
from functools import wraps

from flask import Flask, current_app, request, abort
from werkzeug.exceptions import HTTPException, InternalServerError

app = Flask(__name__)
tasks = {}


def flask_async(f):
    """
    This decorator transforms a sync route to asynchronous by running it in a background thread.
    """
    @wraps(f)
    def wrapped(*args, **kwargs):
        def task(app, environ):
            # Create a request context similar to that of the original request
            with app.request_context(environ):
                try:
                    # Run the route function and record the response
                    tasks[task_id]['result'] = f(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['result'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['result'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task': threading.Thread(
            target=task, args=(current_app._get_current_object(), request.environ))}
        tasks[task_id]['task'].start()

        # Return a 202 response, with an id that the client can use to obtain task status
        return {'TaskId': task_id}, 202

    return wrapped


@app.route('/foo')
@flask_async
def foo():
    time.sleep(10)
    return {'Result': True}


@app.route('/foo/<task_id>', methods=['GET'])
def foo_results(task_id):
    """
        Return results of asynchronous task.
        If this request returns a 202 status code, it means that task hasn't finished yet.
        """
    task = tasks.get(task_id)
    if task is None:
        abort(404)
    if 'result' not in task:
        return {'TaskID': task_id}, 202
    return task['result']


if __name__ == '__main__':
    app.run(debug=True)

但是,您需要一些技巧才能获得结果.端点/foo 将仅返回HTTP代码202和任务ID,而不返回结果.您需要另一个端点/foo/< task_id> 来获取结果.这是本地主机的示例:

However, you need a little trick to get your results. The endpoint /foo will only return the HTTP code 202 and the task id, but not the result. You need another endpoint /foo/<task_id> to get the result. Here is an example for localhost:

import time
import requests

task_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')
            for _ in range(2)]
time.sleep(11)
results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()
           for task_id in task_ids]
# [{'Result': True}, {'Result': True}]

这篇关于烧瓶中的后台任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆