为什么要显式调用 asyncio.StreamWriter.drain? [英] Why should asyncio.StreamWriter.drain be explicitly called?
问题描述
来自doc:
<块引用>写入(数据)
将数据写入流.此方法不受流量控制.对 write() 的调用应该跟在 drain() 之后.
协程排空()
等到适合恢复写入流.例子:writer.write(数据)等待 writer.drain()
据我所知,
- 每次调用
write
时都需要调用drain
. - 如果不是我猜的,
write
会阻塞循环线程
那为什么写的不是自动调用的协程呢?为什么要调用 write
而不必排水?我能想到两种情况
- 您想立即
编写
和关闭
- 您必须在消息完成之前缓冲一些数据.
第一个是特殊情况,我认为我们可以有不同的 API.缓冲应该在写函数内部处理,应用程序不应该关心.
让我换个说法.这样做的缺点是什么?python3.8版本是否有效地做到了这一点?
async def awrite(writer, data):writer.write(数据)等待 writer.drain()
注意:drain
文档明确说明如下:
当没有什么可以等待时,drain()
立即返回.
再次阅读答案和链接,我认为这些功能是这样工作的.注意:检查接受的答案以获得更准确的版本.
def write(data):剩余 = socket.try_write(data)如果剩余:_pendingbuffer.append(remaining) # 如果另一端很慢并且我们有很多数据,缓冲区会继续增长异步定义排水():如果 len(_pendingbuffer)
那么什么时候使用什么:
- 当数据不连续时,比如响应 HTTP 请求.我们只需要发送一些数据而不关心它何时到达并且内存不是问题 - 只需使用
write
- 同上,但内存是一个问题,使用
awrite
- 将数据流式传输到大量客户端时(例如某些实时流或大文件).如果数据在每个连接的缓冲区中重复,它肯定会溢出 RAM.在这种情况下,编写一个循环,每次迭代都会获取一个数据块并调用
awrite
.如果文件很大,loop.sendfile
最好(如果可用).
据我了解,(1) 每次调用 write 时都需要调用 drain.(2)如果不是我猜的,write会阻塞循环线程
两者都不正确,但这种混淆是可以理解的.write()
的工作方式如下:
对
write()
的调用只是将数据存储到缓冲区中,将其留给事件循环以在稍后实际将其写出,而无需程序进一步干预.就应用程序而言,数据在后台写入的速度与另一方能够接收的速度一样快.换句话说,每个write()
将使用尽可能多的操作系统级写入来安排要传输的数据,这些写入在相应的文件描述符实际可写时发出.所有这一切都会自动发生,甚至无需等待drain()
.write()
不是协程,它绝对从不阻塞事件循环.
第二个属性听起来很方便 - 您可以在任何需要的地方调用 write()
,甚至可以从不是 async def
的函数中调用 - 但它实际上是一个主要的 write() 的>缺陷.由流 API 公开的写入与接受数据的操作系统完全分离,因此如果您写入数据的速度比对等方读取数据的速度快,则内部缓冲区将继续增长,并且您将拥有一个 内存泄漏手.drain()
解决了这个问题:如果写入缓冲区太大,等待它会暂停协程,并在 os.write()
在后台成功,缓冲区缩小.
您不需要在每次写入后等待 drain()
,但您确实需要偶尔等待它,通常在 write()
被调用.例如:
虽然为真:响应 = 等待 peer1.readline()peer2.write(b'<响应>')peer2.write(响应)peer2.write(b'</response>')等待 peer2.drain()
drain()
如果待处理的未写入数据量很小,则立即返回.如果数据超过高阈值,drain()
将挂起调用协程,直到待处理的未写入数据量降至低阈值以下.暂停将导致协程停止从 peer1
读取,这反过来又会导致对等方降低向我们发送数据的速度.这种反馈被称为背压.
缓冲应该在写函数内部处理,应用程序不应该关心.
这几乎是 write()
现在的工作方式 - 它确实处理缓冲并且它让应用程序不管好坏.另请参阅此答案了解更多信息.
再次阅读答案和链接,我认为这些功能是这样工作的.
write()
仍然比那聪明一点.它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止.即使您从不等待 drain()
,这也会发生 - 应用程序唯一要做的就是让事件循环运行足够长的时间以将所有内容都写出来.
更正确的 write
和 drain
伪代码可能如下所示:
类 ToyWriter:def __init__(self):self._buf = bytearray()self._empty = asyncio.Event(True)定义写入(自我,数据):self._buf.extend(数据)loop.add_writer(self._fd, self._do_write)self._empty.clear()def _do_write(self):# 事件循环时自动调用# 文件描述符是可写的,无论是否# 任何人调用drain()而 self._buf:尝试:nwritten = os.write(self._fd, self._buf)除了作为 e 的 OSError:如果 e.errno == errno.EWOULDBLOCK:return # 再次可写后继续增加self._buf = self._buf[nwritten:]self._empty.set()loop.remove_writer(self._fd, self._do_write)异步 def 排水(自我):如果 len(self._buf) >64*1024:等待 self._empty.wait()
实际实现更复杂,因为:
- 它写在 Twisted 风格的transport/protocol 层,带有自己复杂的 流控制,而不是在
os.write
之上; drain()
不会真正等到缓冲区为空,而是等到它到达 低水印;- 在
_do_write
中引发的EWOULDBLOCK
以外的异常被存储并在drain()
中重新引发.
最后一点是另一个调用 drain()
的好理由 - 实际上注意到对等点因为写入失败而消失了.>
From doc:
write(data)
Write data to the stream. This method is not subject to flow control. Calls to write() should be followed by drain().
coroutine drain()
Wait until it is appropriate to resume writing to the stream. Example: writer.write(data) await writer.drain()
From what I understand,
- You need to call
drain
every timewrite
is called. - If not I guess,
write
will block the loop thread
Then why is write not a coroutine that calls it automatically? Why would one call write
without having to drain? I can think of two cases
- You want to
write
andclose
immediately - You have to buffer some data before the message it is complete.
First one is a special case, I think we can have a different API. Buffering should be handled inside write function and application should not care.
Let me put the question differently. What is the drawback of doing this? Does the python3.8 version effectively do this?
async def awrite(writer, data):
writer.write(data)
await writer.drain()
Note: drain
doc explicitly states the below:
When there is nothing to wait for, the
drain()
returns immediately.
Reading the answer and links again, I think the functions work like this. Note: Check accepted answer for more accurate version.
def write(data):
remaining = socket.try_write(data)
if remaining:
_pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
async def drain():
if len(_pendingbuffer) < BUF_LIMIT:
return
await wait_until_other_side_is_up_to_speed()
assert len(_pendingbuffer) < BUF_LIMIT
async def awrite(writer, data):
writer.write(data)
await writer.drain()
So when to use what:
- When the data is not continuous, Like responding to an HTTP request. We just need to send some data and don't care about when it is reached and memory is not a concern - Just use
write
- Same as above but memory is a concern, use
awrite
- When streaming data to a large number of clients (e.g. some live stream or a huge file). If the data is duplicated in each of the connection's buffers, it will definitely overflow RAM. In this case, write a loop that takes a chunk of data each iteration and call
awrite
. In case of a huge file,loop.sendfile
is better if available.
From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread
Neither is correct, but the confusion is quite understandable. The way write()
works is as follows:
A call to
write()
just stashes the data to a buffer, leaving it to the event loop to actually write it out at a later time, and without further intervention by the program. As far as the application is concerned, the data is written in the background as fast as the other side is capable of receiving it. In other words, eachwrite()
will schedule its data to be transferred using as many OS-level writes as it takes, with those writes issued when the corresponding file descriptor is actually writable. All this happens automatically, even without ever awaitingdrain()
.write()
is not a coroutine, and it absolutely never blocks the event loop.
The second property sounds convenient - you can call write()
wherever you need to, even from a function that's not async def
- but it's actually a major flaw of write()
. Writing as exposed by the streams API is completely decoupled from the OS accepting the data, so if you write data faster than your peer can read it, the internal buffer will keep growing and you'll have a memory leak on your hands. drain()
fixes that problem: awaiting it pauses the coroutine if the write buffer is too large, and resumes it again once the os.write()
's performed in the background are successful and the buffer shrinks.
You don't need to await drain()
after every write, but you do need to await it occasionally, typically between iterations of a loop in which write()
is invoked. For example:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
drain()
returns immediately if the amount of pending unwritten data is small. If the data exceeds a high threshold, drain()
will suspend the calling coroutine until the amount of pending unwritten data drops beneath a low threshold. The pause will cause the coroutine to stop reading from peer1
, which will in turn cause the peer to slow down the rate at which it sends us data. This kind of feedback is referred to as back-pressure.
Buffering should be handled inside write function and application should not care.
That is pretty much how write()
works now - it does handle buffering and it lets the application not care, for better or worse. Also see this answer for additional info.
Addressing the edited part of the question:
Reading the answer and links again, I think the the functions work like this.
write()
is still a bit smarter than that. It won't try to write only once, it will actually arrange for data to continue to be written until there is no data left to write. This will happen even if you never await drain()
- the only thing the application must do is let the event loop run its course for long enough to write everything out.
A more correct pseudo code of write
and drain
might look like this:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
The actual implementation is more complicated because:
- it's written on top of a Twisted-style transport/protocol layer with its own sophisticated flow control, not on top of
os.write
; drain()
doesn't really wait until the buffer is empty, but until it reaches a low watermark;- exceptions other than
EWOULDBLOCK
raised in_do_write
are stored and re-raised indrain()
.
The last point is another good reason to call drain()
- to actually notice that the peer is gone by the fact that writing to it is failing.
这篇关于为什么要显式调用 asyncio.StreamWriter.drain?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!