如何使用Flask路由中的线程 [英] How to use threads within a Flask route

查看:486
本文介绍了如何使用Flask路由中的线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python flask应用程序。我想在响应一个特定的路由时使用并发,而不在每个请求上创建额外的线程。

I have a python flask app. I would like to use concurrency when responding to a specific route without creating extra threads on every request.

有一个路由定义如下:

def sentence_numfound(path):
    nf = util.NumFound(path)
    return json.dumps(nf.results(path))

nf.results()需要在返回前发出多个http请求,在平行下。目前我这样做:

nf.results() needs to issue multiple http requests before returning, and I would like to do them in parallel. Currently I'm doing this:

class NumFound:
    def __init__(self, path):
        queries = get_queries(path) # A list
        self.__results = [{}] * len(queries)
        self.queue = Queue.Queue()
        for i, q in enumerate(queries):
            self.queue.put((i, q))

    def results(self):
        num_workers = 31
        for i in range(num_workers):
            t = threading.Thread(target=self.worker)
            t.daemon = True
            t.start()
        self.queue.join()
        return self.__results

    def worker(self):
        while True:
            i, q = self.queue.get()
            self.__results[i] = foo(q)
            self.queue.task_done()

问题是新线程是在每个请求创建的,没有办法关闭它们。最终,路由失败并出现错误,因为python不能创建任何更多的线程。

The problem is that new threads are created on every request and there's no way to close them. Eventually the route fails with an error because python can't create any more threads.

有一个简单的方法来重用线程吗?或者另一种方式来实现并发?

Is there a simple way to reuse the threads? Or another way to achieve the concurrency?

推荐答案

我认为你会接近你的实现使用 multiprocessing.Pool

I think you will get close to your implementation using a multiprocessing.Pool.

您可以如下设置工作池:

You can set up a pool of workers as follows:

from multiprocessing import Pool
pool = Pool(processes=31)

然后你所有的路线需要提交作业到池,的他们要做。我无法测试这个,因为你没有提供足够的代码,但它可能看起来更多或更少这样:

Then all your route needs to do is submit the jobs to the pool and wait for all of them to be done. I can't test this because you haven't provided enough code, but it may look more or less like this:

def sentence_numfound(path):
    return jsonify(pool.map(foo, get_queries(path)))

这基本上为池中拥有的进程中的每个查询调用 foo(query),所有这些都并行。所有作业完成后, map()调用将返回。返回值是一个具有结果的数组,按照与输入数组相同的顺序。

This basically calls foo(query) for each query in the processes owned by the pool, all in parallel. The map() call will return when all the jobs are done. The return value is an array with the results, in the same order as the input array.

我希望这有助于!

这篇关于如何使用Flask路由中的线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆