python asyncio如何读取StdIn并写入StdOut? [英] python asyncio how to read StdIn and write to StdOut?
问题描述
我需要异步读取 StdIn 以获取消息(json 由 \r\n 终止),并在处理异步后将更新的消息写入 StdOut.
目前我正在同步进行:
class SyncIOStdInOut():def write(self, payload: str):sys.stdout.write(payload)sys.stdout.write('\r\n')sys.stdout.flush()def read(self) ->字符串:有效载荷 = sys.stdin.readline()返回有效载荷
如何以异步方式执行相同的操作?
以下是使用 asyncio 流(适用于 Unix).
导入异步导入系统异步定义 connect_stdin_stdout():loop = asyncio.get_event_loop()阅读器 = asyncio.StreamReader()协议 = asyncio.StreamReaderProtocol(reader)等待 loop.connect_read_pipe(lambda:协议,sys.stdin)w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)返回读者,作者异步定义主():读者,作者 = 等待 connect_stdin_stdout()为真:res = 等待 reader.read(100)如果不是资源:休息writer.write(res)如果 __name__ == __main__":asyncio.run(main())
作为即用型解决方案,您可以使用 aioconsole 库.它实现了类似的方法,但也提供了额外的有用的异步等效于 input
、print
、exec
和 code.interact
>:
from aioconsole import get_standard_streams异步定义主():读者,作者 = 等待 get_standard_streams()
更新:
让我们试着弄清楚函数 connect_stdin_stdout
是如何工作的.
- 获取当前事件循环:
loop = asyncio.get_event_loop()
- 创建
StreamReader
实例.
reader = asyncio.StreamReader()
通常,StreamReader/StreamWriter
类不打算直接实例化,只能作为 open_connection()
和 start_server 等函数的结果使用()
.StreamReader
为一些数据流提供一个缓冲的异步接口.一些源代码(库代码)调用其函数如feed_data
、feed_eof
,数据被缓冲,可以使用文档化 接口协程 read()
, readline()
等
- 创建
StreamReaderProtocol
实例.
protocol = asyncio.StreamReaderProtocol(reader)
这个类是从asyncio.Protocol
和FlowControlMixin
派生的,有助于在Protocol
和StreamReader
之间进行适配.它覆盖了诸如data_received
、eof_received
等Protocol
方法并调用StreamReader
方法feed_data
.
- 在事件循环中注册标准输入流
stdin
.
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
connect_read_pipe
函数将一个类似文件的对象作为 pipe
参数.stdin
是一个类文件对象.从现在开始,所有从stdin
读取的数据都会落入StreamReaderProtocol
,然后传入StreamReader
- 在事件循环中注册标准输出流
stdout
.
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
在connect_write_pipe
中,您需要传递一个协议工厂,该工厂创建实现StreamWriter.drain()
的流控制逻辑的协议实例.该逻辑在 FlowControlMixin
类中实现.也StreamReaderProtocol
继承自它.
- 创建
StreamWriter
实例.
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
该类使用write()
、writelines()
等函数将传递给它的数据转发到底层transport
.
protocol
用于支持 drain()
函数等待底层传输已刷新其内部缓冲区并可以再次写入的那一刻.>
reader
是一个可选参数,可以是None
,它也用来支持drain()
函数,在开头此函数检查是否为读取器设置了异常,例如,由于连接丢失(与套接字和双向连接相关),然后 drain()
也会抛出异常.>
您可以在这个很棒的 StreamWriter 和 drain()
函数的更多信息>回答.
更新 2:
使用 \r\n
分隔符读取行 readuntil 可以使用
I have a need to async read StdIn in order to get messages (json terminated by \r\n) and after processing async write updated message to StdOut.
At the moment I am doing it synchronous like:
class SyncIOStdInOut():
def write(self, payload: str):
sys.stdout.write(payload)
sys.stdout.write('\r\n')
sys.stdout.flush()
def read(self) -> str:
payload=sys.stdin.readline()
return payload
How to do the same but asynchronously?
Here's an example of echo stdin
to stdout
using asyncio streams (for Unix).
import asyncio
import sys
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(100)
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
As a ready-to-use solution, you could use aioconsole library. It implements a similar approach, but also provide additional useful asynchronous equivalents to input
, print
, exec
and code.interact
:
from aioconsole import get_standard_streams
async def main():
reader, writer = await get_standard_streams()
Update:
Let's try to figure out how the function connect_stdin_stdout
works.
- Get the current event loop:
loop = asyncio.get_event_loop()
- Create
StreamReader
instance.
reader = asyncio.StreamReader()
Generally, StreamReader/StreamWriter
classes are not intended to be directly instantiated and should only be used as a result of functions such as open_connection()
and start_server()
.
StreamReader
provides a buffered asynchronous interface to some data stream. Some source(library code) calls its functions such as feed_data
, feed_eof
, the data is buffered and can be read using the documented interface coroutine read()
, readline()
, and etc.
- Create
StreamReaderProtocol
instance.
protocol = asyncio.StreamReaderProtocol(reader)
This class is derived from asyncio.Protocol
and FlowControlMixin
and helps to adapt between Protocol
and StreamReader
. It overrides such Protocol
methods as data_received
, eof_received
and calls StreamReader
methods feed_data
.
- Register standard input stream
stdin
in the event loop.
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
The connect_read_pipe
function takes as a pipe
parameter a file-like object. stdin
is a file-like object. From now, all data read from the stdin
will fall into the StreamReaderProtocol
and then pass into StreamReader
- Register standard output stream
stdout
in the event loop.
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
In connect_write_pipe
you need to pass a protocol factory that creates protocol instances that implement flow control logic for StreamWriter.drain()
. This logic is implemented in the class FlowControlMixin
. Also StreamReaderProtocol
inherited from it.
- Create
StreamWriter
instance.
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
This class forwards the data passed to it using functions write()
, writelines()
and etc. to the underlying transport
.
protocol
is used to support the drain()
function to wait for the moment that the underlying transport has flushed its internal buffer and is available for writing again.
reader
is an optional parameter and can be None
, it is also used to support the drain()
function, at the start of this function it is checked if an exception was set for the reader, for example, due to a connection lost (relevant for sockets and bidirectional connections), then drain()
will also throw an exception.
You can read more about StreamWriter
and drain()
function in this great answer.
Update 2:
To read lines with \r\n
separator readuntil can be used
这篇关于python asyncio如何读取StdIn并写入StdOut?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!