在龙卷风中,如何进行非阻塞文件读写? [英] In Tornado, how to do non-blocking file read/write?

查看:85
本文介绍了在龙卷风中,如何进行非阻塞文件读写?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用Tornado一段时间了,我遇到了时机缓慢的问题(我在在我的例程中写入文件,而这可能是一段阻塞的代码.

I've been using Tornado for a while now and I've encountered issues with slow timing (which I asked about in this question). One possible issue that was pointed out by a fellow user was that I was using regular open("..." , 'w') to write to files in my co-routine and that this might be a blocking piece of code.

所以我的问题是,有没有办法在Tornado中执行非阻塞文件IO?我在研究中找不到符合我需要的任何东西.

So my question is, is there a way to do non-blocking file IO in Tornado? I couldn't find anything in my research that fit my needs.

推荐答案

我提供了另一个答案,因为事实证明,在单独的线程中读取/写入整个文件不适用于大型文件.您不能在一个块中接收或发送大文件的全部内容,因为您可能没有足够的内存.

I'm providing another answer because as it turns out, reading/writting the whole file in a separate thread does not work for large files. You cannot receive or send the full contents of a big file in one chunk, because you may not have enough memory.

对我来说,当ioloop主线程中的块处理器无法跟上速度时,找出如何阻塞读取器/写入器线程并不是一件容易的事.当文件读取操作比块处理器快得多时,并且文件读取操作较慢时,以下实现也可以有效地工作.同步是通过异步队列和锁的组合来实现的,它不会以任何方式阻塞ioloop的线程.

For me, it was not trivial to find out how to block the reader/writer thread when the chunk processor in the ioloop's main thread is not able to keep up with the speed. The implementation below works efficiently when the file read operation is much faster than the chunk processor, and also when the file read operation is the slower. Synchronization is realized by the combination of an async queue and a lock, and it does not block the ioloop's thread in any way.

该锁仅在循环线程中释放,它永远不会被获取,那里没有竞争条件.

The lock is only RELEASED in the loop's thread, it is never acquired, there is no race condition there.

我不希望这被接受为答案,但是由于花了我一段时间才弄清楚,所以我认为这可能会对其他人的实现有所帮助.

I do not expect this to be accepted as an answer, but since it took me a while to figure out, I guess it may help others in their implementations.

这不仅可以推广到文件读/写操作,还可以推广到任何在一侧有单独线程而另一侧在ioloop中的使用者/生产者对.

This can be generalized not just for file read/write operations, but for any consumer/producer pair that has one side in a separate thread and the other side in the ioloop.

import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue


def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
    file_size = os.path.getsize(file_path)
    remaining = file_size
    fin = open(file_path, "rb")
    lock = threading.Lock()

    def putter(chunk, lock: threading.Lock):
        queue.put(chunk)        # Called from the loop's thread -> can block
        lock.release()          # Awake reader thread after the chunk has been put into the processing queue

    def put(chunk, lock):
        """Put the chunk into the queue, and wait until it is processed by the ioloop"""
        lock.acquire()  # Acquire in this thread
        io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
        lock.acquire()  # Wait until the loop's thread has accepted the chunk for processing
        lock.release()  # Cleanup before return

    # Put the file size into the queue without waiting
    io_loop.spawn_callback(queue.put, file_size)

    while remaining > 0:
        chunk = fin.read(min(chunk_size, remaining))
        print("read", chunk)
        remaining -= len(chunk)
        time.sleep(1)  # Just for testing: simulate slow file reads.
        put(chunk, lock)

    # Put EOF/terminator into the queue
    io_loop.spawn_callback(queue.put, None)


pool = ThreadPoolExecutor(3)


async def main():
    # Create a queue for sending chunks of data
    cq = Queue(maxsize=3)
    # Start the reader thread that reads in a separate thread
    pool.submit(read_file, __file__, cq, io_loop, 100)
    file_size = await cq.get()
    print("file size:", file_size)
    # Process chunks
    while True:
        item = await cq.get()
        # Terminator -> EOF
        if item is None:
            break
        print("got chunk:", repr(item))

    io_loop.stop()


if __name__ == '__main__':
    io_loop = IOLoop.current()
    io_loop.run_sync(main)
    io_loop.start()

这篇关于在龙卷风中,如何进行非阻塞文件读写?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆