使用 Tornado 设计异步请求和阻塞处理 [英] Design of asynchronous request and blocking processing using Tornado

查看:57
本文介绍了使用 Tornado 设计异步请求和阻塞处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现一个 Python 应用程序,该应用程序使用异步函数通过 NATS 接收和发出消息,使用客户端基于 Tornado.收到消息后,必须调用阻塞函数,我正尝试在单独的线程上实现该函数,以允许接收和发布消息以将消息放入 Tornado 队列中,以便稍后处理阻塞函数.

我对 Tornado(以及 Python 多线程)非常陌生,但在阅读了多次 Tornado 文档和其他来源后,我已经能够提供代码的工作版本,如下所示:

导入 tornado.gen导入 tornado.ioloop从 tornado.queues 导入队列从 concurrent.futures 导入 ThreadPoolExecutor从 nats.io.client 导入客户端作为 NATS消息队列 = 队列()nc = NATS()@tornado.gen.coroutine定义消费者():def processMessage(currentMessage):# 处理消息...为真:currentMessage = yield messageQueue.get()尝试:# 在单独的线程中执行调用以防止阻塞队列EXECUTOR.submit(processMessage, currentMessage)最后:messageQueue.task_done()@tornado.gen.coroutine定义生产者():@tornado.gen.coroutinedef enqueueMessage(currentMessage):产生 messageQueue.put(currentMessage)yield nc.subscribe("new_event", "", enqueueMessage)@tornado.gen.coroutine定义主():tornado.ioloop.IOLoop.current().spawn_callback(消费者)产量生产者()如果 __name__ == '__main__':主要的()tornado.ioloop.IOLoop.current().start()

我的问题是:

1) 这是使用 Tornado 调用阻塞函数的正确方法吗?

2) 实施始终监听的消费者/生产者方案的最佳实践是什么?恐怕我的 while True: 语句实际上阻塞了处理器...

3) 我如何检查队列以确保大量呼叫正在排队?我试过使用 Queue().qSize(),但它总是返回零,这让我怀疑排队是否正确完成.

解决方案

一般规则(归功于 NYKevin)是:

  • 用于 CPU 和 GPU 密集型计算的多处理.
  • 用于非阻塞 I/O 的事件驱动的东西(在可能的情况下应该优先于阻塞 I/O,因为它可以更有效地扩展).
  • 用于阻塞 I/O 的线程(您也可以使用多处理,但每个进程的开销可能不值得).

IO 的 ThreadPoolExecutor,CPU 的 ProcessPoolExecutor.两者都有内部队列,都可以扩展到最多指定的max_workers.有关文档中的并发执行器的更多信息.

所以答案是:

  1. 重新实现池是一种开销.线程或进程取决于您打算做什么.
  2. while True 不会阻塞,如果你有例如一些产生了异步调用(甚至 yield gen.sleep(0.01)),它将控制权交还给 ioloop
  3. qsize() 是调用的权利,但由于我没有运行/调试它,我会采取不同的方法(现有池),在这里很难找到问题.

I'm trying to implement a Python app that uses async functions to receive and emit messages using NATS, using a client based on Tornado. Once a message is received, a blocking function must be called, that I'm trying to implement on a separate thread, to allow the reception and publication of messages to put messages in a Tornado queue for later processing of the blocking function.

I'm very new to Tornado (and to python multithreading), but after reading several times the Tornado documentation and other sources, I've been able to put up a working version of the code, that looks like this:

import tornado.gen
import tornado.ioloop
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from nats.io.client import Client as NATS

messageQueue = Queue()
nc = NATS()
@tornado.gen.coroutine
def consumer():
    def processMessage(currentMessage):
        # process the message ...

    while True:
        currentMessage = yield messageQueue.get()
        try:
            # execute the call in a separate thread to prevent blocking the queue
            EXECUTOR.submit(processMessage, currentMessage)
        finally:
            messageQueue.task_done()

@tornado.gen.coroutine
def producer():
    @tornado.gen.coroutine
    def enqueueMessage(currentMessage):
        yield messageQueue.put(currentMessage)

    yield nc.subscribe("new_event", "", enqueueMessage)

@tornado.gen.coroutine
def main():
    tornado.ioloop.IOLoop.current().spawn_callback(consumer)
    yield producer()

if __name__ == '__main__':
    main()
    tornado.ioloop.IOLoop.current().start()

My questions are:

1) Is this the correct way of using Tornado to call a blocking function?

2) What's the best practice for implementing a consumer/producer scheme that is always listening? I'm afraid my while True: statement is actually blocking the processor...

3) How can I inspect the Queue to make sure a burst of calls is being enqueued? I've tried using Queue().qSize(), but it always returns zero, which makes me wonder if the enqueuing is done correctly or not.

解决方案

General rule (credits to NYKevin) is:

  • multiprocessing for CPU- and GPU-bound computations.
  • Event-driven stuff for non-blocking I/O (which should be preferred over blocking I/O where possible, since it scales much more effectively).
  • Threads for blocking I/O (you can also use multiprocessing, but the per-process overhead probably isn't worth it).

ThreadPoolExecutor for IO, ProcessPoolExecutor for CPU. Both have internal queue, both scale to at most specified max_workers. More info about concurrent executors in docs.

So answer are:

  1. Reimplementing pool is an overhead. Thread or Process depends on what you plan to do.
  2. while True is not blocking if you have e.g. some yielded async calls (even yield gen.sleep(0.01)), it gives back control to ioloop
  3. qsize() is the right to call, but since I have not run/debug this and I would take a different approach (existing pool), it is hard to find a problem here.

这篇关于使用 Tornado 设计异步请求和阻塞处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆