Queue和ProcessPoolExecutor在Tornado中 [英] Queue and ProcessPoolExecutor in Tornado

查看:879
本文介绍了Queue和ProcessPoolExecutor在Tornado中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图使用新的 Tornado队列对象以及< a href =https://docs.python.org/3/library/concurrent.futures.html =nofollow> concurrent.futures 以允许我的web服务器将cpu密集型任务传递给其他进程。我想访问从并发中的 ProcessPoolExecutor 返回的 Future 对象。 futures 模块,以便我可以查询其状态显示在前端(例如显示进程正在运行;显示它已完成)。

I'm attempting to use the new Tornado queue object along with concurrent.futures to allow my webserver to pass off cpu-intensive tasks to other processes. I want to have access to the Future object that's returned from the ProcessPoolExecutor from the concurrent.futures module so that I can query its state to show on the front-end (e.g. show the process is currently running; show that it has finished).

我似乎有两个障碍与此方法:

I seem to have two hurdles with this method:


  1. 如何提交多个 q.get ()对象 ProcessPoolExecutor ,同时还可以访问返回的 Future 对象? / li>
  2. 如何让 HomeHandler 访问返回的 Future 对象 ProcessPoolExecutor ,以便我可以在前端显示状态信息?

  1. How can I submit multiple q.get() objects to the ProcessPoolExecutor while also having access to the returned Future objects?
  2. How can I let the HomeHandler get access to the Future object returned by the ProcessPoolExecutor so that I may show the state information on the front-end?

感谢任何帮助。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from concurrent.futures import ProcessPoolExecutor

define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)


def expensive_function(input_dict):
    gen.sleep(1)


@gen.coroutine
def consumer():
    while True:
        input_dict = yield q.get()
        try:
            with ProcessPoolExecutor(max_workers=4) as executor:
                future = executor.submit(expensive_function, input_dict)
        finally:
            q.task_done()


@gen.coroutine
def producer(input_dict):
    yield q.put(input_dict)


class Application(tornado.web.Application):
def __init__(self):
    handlers = [
        (r"/", HomeHandler),
    ]
    settings = dict(
        blog_title=u"test",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        debug=True,
    )
    super(Application, self).__init__(handlers, **settings)


class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.html")

    def post(self, *args, **kwargs):
        input_dict = {'foo': 'bar'}

        producer(input_dict)

        self.redirect("/")


def main():
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


def start_consumer():
    tornado.ioloop.IOLoop.current().spawn_callback(consumer)


if __name__ == "__main__":
    tornado.ioloop.IOLoop.current().run_sync(start_consumer)
    main()


推荐答案

您尝试通过组合 code>和 ProcessPoolExecutor ?执行器已经有它自己的内部队列。所有你需要做的是使 ProcessPoolExecutor 一个全局(它不必是一个全局,但你会想做一个类似于全局的东西,即使你保持队列;每次通过 consumer 的循环创建一个新的 ProcessPoolExecutor 是没有意义的)和

What are you trying to accomplish by combining a Queue and a ProcessPoolExecutor? The executor already has it's own internal queue. All you need to do is make the ProcessPoolExecutor a global (it doesn't have to be a global, but you'll want to do something similar to a global even if you keep the queue; it doesn't make sense to create a new ProcessPoolExecutor each time through consumer's loop) and submit things to it directly from the handler.

@gen.coroutine
def post(self):
    input_dict = ...
    result = yield executor.submit(expensive_function, input_dict)

这篇关于Queue和ProcessPoolExecutor在Tornado中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆