python asyncio如何读取StdIn并写入StdOut? [英] python asyncio how to read StdIn and write to StdOut?

查看:29
本文介绍了python asyncio如何读取StdIn并写入StdOut?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要异步读取 StdIn 以获取消息(json 由 \r\n 终止),并在处理异步后将更新的消息写入 StdOut.

目前我正在同步进行:

class SyncIOStdInOut():def write(self, payload: str):sys.stdout.write(payload)sys.stdout.write('\r\n')sys.stdout.flush()def read(self) ->字符串:有效载荷 = sys.stdin.readline()返回有效载荷

如何以异步方式执行相同的操作?

解决方案

以下是使用 asyncio 流(适用于 Unix).

导入异步导入系统异步定义 connect_stdin_stdout():loop = asyncio.get_event_loop()阅读器 = asyncio.StreamReader()协议 = asyncio.StreamReaderProtocol(reader)等待 loop.connect_read_pipe(lambda:协议,sys.stdin)w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)返回读者,作者异步定义主():读者,作者 = 等待 connect_stdin_stdout()为真:res = 等待 reader.read(100)如果不是资源:休息writer.write(res)如果 __name__ == __main__":asyncio.run(main())


作为即用型解决方案,您可以使用 aioconsole 库.它实现了类似的方法,但也提供了额外的有用的异步等效于 inputprintexeccode.interact>:

from aioconsole import get_standard_streams异步定义主():读者,作者 = 等待 get_standard_streams()

更新:

让我们试着弄清楚函数 connect_stdin_stdout 是如何工作的.

  1. 获取当前事件循环:

loop = asyncio.get_event_loop()

  1. 创建StreamReader 实例.

reader = asyncio.StreamReader()

通常,StreamReader/StreamWriter 类不打算直接实例化,只能作为 open_connection()start_server 等函数的结果使用().StreamReader 为一些数据流提供一个缓冲的异步接口.一些源代码(库代码)调用其函数如feed_datafeed_eof,数据被缓冲,可以使用文档化 接口协程 read(), readline()

  1. 创建StreamReaderProtocol 实例.

protocol = asyncio.StreamReaderProtocol(reader)

这个类是从asyncio.ProtocolFlowControlMixin 派生的,有助于在ProtocolStreamReader 之间进行适配.它覆盖了诸如data_receivedeof_receivedProtocol 方法并调用StreamReader 方法feed_data.

  1. 在事件循环中注册标准输入流stdin.

await loop.connect_read_pipe(lambda: protocol, sys.stdin)

connect_read_pipe 函数将一个类似文件的对象作为 pipe 参数.stdin 是一个类文件对象.从现在开始,所有从stdin读取的数据都会落入StreamReaderProtocol,然后传入StreamReader

  1. 在事件循环中注册标准输出流stdout.

w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

connect_write_pipe 中,您需要传递一个协议工厂,该工厂创建实现StreamWriter.drain() 的流控制逻辑的协议实例.该逻辑在 FlowControlMixin 类中实现.也StreamReaderProtocol 继承自它.

  1. 创建StreamWriter 实例.

writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

该类使用write()writelines()等函数将传递给它的数据转发到底层transport.

protocol 用于支持 drain() 函数等待底层传输已刷新其内部缓冲区并可以再次写入的那一刻.>

reader 是一个可选参数,可以是None,它也用来支持drain()函数,在开头此函数检查是否为读取器设置了异常,例如,由于连接丢失(与套接字和双向连接相关),然后 drain() 也会抛出异常.>

您可以在这个很棒的 StreamWriter 和 drain() 函数的更多信息>回答.

更新 2:

使用 \r\n 分隔符读取行 readuntil 可以使用

I have a need to async read StdIn in order to get messages (json terminated by \r\n) and after processing async write updated message to StdOut.

At the moment I am doing it synchronous like:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

How to do the same but asynchronously?

解决方案

Here's an example of echo stdin to stdout using asyncio streams (for Unix).

import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())


As a ready-to-use solution, you could use aioconsole library. It implements a similar approach, but also provide additional useful asynchronous equivalents to input, print, exec and code.interact:

from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

Update:

Let's try to figure out how the function connect_stdin_stdout works.

  1. Get the current event loop:

loop = asyncio.get_event_loop()

  1. Create StreamReader instance.

reader = asyncio.StreamReader()

Generally, StreamReader/StreamWriter classes are not intended to be directly instantiated and should only be used as a result of functions such as open_connection() and start_server(). StreamReader provides a buffered asynchronous interface to some data stream. Some source(library code) calls its functions such as feed_data, feed_eof, the data is buffered and can be read using the documented interface coroutine read(), readline(), and etc.

  1. Create StreamReaderProtocol instance.

protocol = asyncio.StreamReaderProtocol(reader)

This class is derived from asyncio.Protocol and FlowControlMixin and helps to adapt between Protocol and StreamReader. It overrides such Protocol methods as data_received, eof_received and calls StreamReader methods feed_data.

  1. Register standard input stream stdin in the event loop.

await loop.connect_read_pipe(lambda: protocol, sys.stdin)

The connect_read_pipe function takes as a pipe parameter a file-like object. stdin is a file-like object. From now, all data read from the stdin will fall into the StreamReaderProtocol and then pass into StreamReader

  1. Register standard output stream stdout in the event loop.

w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

In connect_write_pipe you need to pass a protocol factory that creates protocol instances that implement flow control logic for StreamWriter.drain(). This logic is implemented in the class FlowControlMixin. Also StreamReaderProtocol inherited from it.

  1. Create StreamWriter instance.

writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

This class forwards the data passed to it using functions write(), writelines() and etc. to the underlying transport.

protocol is used to support the drain() function to wait for the moment that the underlying transport has flushed its internal buffer and is available for writing again.

reader is an optional parameter and can be None, it is also used to support the drain() function, at the start of this function it is checked if an exception was set for the reader, for example, due to a connection lost (relevant for sockets and bidirectional connections), then drain() will also throw an exception.

You can read more about StreamWriter and drain() function in this great answer.

Update 2:

To read lines with \r\n separator readuntil can be used

这篇关于python asyncio如何读取StdIn并写入StdOut?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆