Queue和ProcessPoolExecutor在Tornado中 [英] Queue and ProcessPoolExecutor in Tornado
问题描述
我试图使用新的 Tornado队列对象以及< a href =https://docs.python.org/3/library/concurrent.futures.html =nofollow> concurrent.futures
以允许我的web服务器将cpu密集型任务传递给其他进程。我想访问从并发中的
模块,以便我可以查询其状态显示在前端(例如显示进程正在运行;显示它已完成)。 ProcessPoolExecutor
返回的 Future
对象。 futures
I'm attempting to use the new Tornado queue object along with concurrent.futures
to allow my webserver to pass off cpu-intensive tasks to other processes. I want to have access to the Future
object that's returned from the ProcessPoolExecutor
from the concurrent.futures
module so that I can query its state to show on the front-end (e.g. show the process is currently running; show that it has finished).
我似乎有两个障碍与此方法:
I seem to have two hurdles with this method:
- 如何提交多个
q.get ()
对象
ProcessPoolExecutor
,同时还可以访问返回的Future
对象? / li>
- 如何让
HomeHandler
访问返回的Future
对象ProcessPoolExecutor
,以便我可以在前端显示状态信息?
- 如何让
- How can I submit multiple
q.get()
objects to theProcessPoolExecutor
while also having access to the returnedFuture
objects? - How can I let the
HomeHandler
get access to theFuture
object returned by theProcessPoolExecutor
so that I may show the state information on the front-end?
感谢任何帮助。
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from concurrent.futures import ProcessPoolExecutor
define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)
def expensive_function(input_dict):
gen.sleep(1)
@gen.coroutine
def consumer():
while True:
input_dict = yield q.get()
try:
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(expensive_function, input_dict)
finally:
q.task_done()
@gen.coroutine
def producer(input_dict):
yield q.put(input_dict)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", HomeHandler),
]
settings = dict(
blog_title=u"test",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True,
)
super(Application, self).__init__(handlers, **settings)
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.html")
def post(self, *args, **kwargs):
input_dict = {'foo': 'bar'}
producer(input_dict)
self.redirect("/")
def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def start_consumer():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(start_consumer)
main()
推荐答案
您尝试通过组合 code>和
ProcessPoolExecutor
?执行器已经有它自己的内部队列。所有你需要做的是使 ProcessPoolExecutor
一个全局(它不必是一个全局,但你会想做一个类似于全局的东西,即使你保持队列;每次通过 consumer
的循环创建一个新的 ProcessPoolExecutor
是没有意义的)和
What are you trying to accomplish by combining a Queue
and a ProcessPoolExecutor
? The executor already has it's own internal queue. All you need to do is make the ProcessPoolExecutor
a global (it doesn't have to be a global, but you'll want to do something similar to a global even if you keep the queue; it doesn't make sense to create a new ProcessPoolExecutor
each time through consumer
's loop) and submit things to it directly from the handler.
@gen.coroutine
def post(self):
input_dict = ...
result = yield executor.submit(expensive_function, input_dict)
这篇关于Queue和ProcessPoolExecutor在Tornado中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!