异步写入文件 [英] Writing files asynchronously

查看:430
本文介绍了异步写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试创建一个服务器进程,以异步方式接收来自客户端进程的输入文件路径和输出路径.服务器进行了一些与数据库有关的转换,但是为了简单起见,我们假设它只是将所有内容都转换为大写.这是服务器的一个玩具示例:

I've been trying to create a server-process that receives an input file path and an output path from client processes asynchronously. The server does some database-reliant transformations, but for the sake of simplicity let's assume it merely puts everything to the upper case. Here is a toy example of the server:

import asyncio
import aiofiles as aiof
import logging
import sys


ADDRESS = ("localhost", 10000)

logging.basicConfig(level=logging.DEBUG,
                    format="%(name)s: %(message)s",
                    stream=sys.stderr)

log = logging.getLogger("main")
loop = asyncio.get_event_loop()


async def server(reader, writer):
    log = logging.getLogger("process at {}:{}".format(*ADDRESS))
    paths = await reader.read()
    in_fp, out_fp = paths.splitlines()
    log.debug("connection accepted")
    log.debug("processing file {!r}, writing output to {!r}".format(in_fp, out_fp))
    async with aiof.open(in_fp, loop=loop) as inp, aiof.open(out_fp, "w", loop=loop) as out:
        async for line in inp:
            out.write(line.upper())
        out.flush()
    writer.write(b"done")
    await writer.drain()
    log.debug("closing")
    writer.close()
    return


factory = asyncio.start_server(server, *ADDRESS)
server = loop.run_until_complete(factory)
log.debug("starting up on {} port {}".format(*ADDRESS))

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug("closing server")
    server.close()
    loop.run_until_complete(server.wait_closed())
    log.debug("closing event loop")
    loop.close()

客户:

import asyncio
import logging
import sys
import random

ADDRESS = ("localhost", 10000)
MESSAGES = ["/path/to/a/big/file.txt\n", 
            "/output/file_{}.txt\n".format(random.randint(0, 99999))]

logging.basicConfig(level=logging.DEBUG,
                    format="%(name)s: %(message)s",
                    stream=sys.stderr)

log = logging.getLogger("main")
loop = asyncio.get_event_loop()

async def client(address, messages):
    log = logging.getLogger("client")
    log.debug("connecting to {} port {}".format(*address))
    reader, writer = await asyncio.open_connection(*address)
    writer.writelines([bytes(line, "utf8") for line in messages])
    if writer.can_write_eof():
        writer.write_eof()
    await writer.drain()

    log.debug("waiting for response")
    response = await reader.read()
    log.debug("received {!r}".format(response))
    writer.close()
    return


try:
    loop.run_until_complete(client(ADDRESS, MESSAGES))
finally:
    log.debug("closing event loop")
    loop.close()

我同时激活了服务器和几个客户端.服务器的日志:

I activated the server and several clients at once. The server's logs:

asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
process at localhost:10000: connection accepted
process at localhost:10000: processing file b'/path/to/a/big/file.txt', writing output to b'/output/file_79609.txt'
process at localhost:10000: connection accepted
process at localhost:10000: processing file b'/path/to/a/big/file.txt', writing output to b'/output/file_68917.txt'
process at localhost:10000: connection accepted
process at localhost:10000: processing file b'/path/to/a/big/file.txt', writing output to b'/output/file_2439.txt'
process at localhost:10000: closing
process at localhost:10000: closing
process at localhost:10000: closing

所有客户都打印此:

asyncio: Using selector: KqueueSelector
client: connecting to localhost port 10000
client: waiting for response
client: received b'done'
main: closing event loop

创建了输出文件,但它们保留为空.我相信他们没有被冲洗.有什么办法可以解决吗?

The output files are created, but they remain empty. I believe they are not being flushed. Any way I can fix it?

推荐答案

您在out.write()out.flush()之前缺少await:

import asyncio
from pathlib import Path

import aiofiles as aiof

FILENAME = "foo.txt"


async def bad():
    async with aiof.open(FILENAME, "w") as out:
        out.write("hello world")
        out.flush()
    print("done")


async def good():
    async with aiof.open(FILENAME, "w") as out:
        await out.write("hello world")
        await out.flush()
    print("done")


loop = asyncio.get_event_loop()

server = loop.run_until_complete(bad())
print(Path(FILENAME).stat().st_size)  # prints 0

server = loop.run_until_complete(good())
print(Path(FILENAME).stat().st_size)  # prints 11

但是,我强烈建议您尝试跳过aiofile并使用常规的同步磁盘I/O,并保持asyncio的网络活动:

However, I would strongly recommend trying to skip aiofiles and use regular, synchronized disk I/O, and keep asyncio for network activity:

with open(file, "w"):  # regular file I/O
    async for s in network_request():  # asyncio for slow network work. measure it!
        f.write(s) # should be really quick, measure it!

这篇关于异步写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆