为什么要显式调用 asyncio.StreamWriter.drain? [英] Why should asyncio.StreamWriter.drain be explicitly called?

查看:63
本文介绍了为什么要显式调用 asyncio.StreamWriter.drain?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

来自doc:

<块引用>

写入(数据)

将数据写入流.此方法不受流量控制.对 write() 的调用应该跟在 drain() 之后.

协程排空()

等到适合恢复写入流.例子:writer.write(数据)等待 writer.drain()

据我所知,

  • 每次调用write时都需要调用drain.
  • 如果不是我猜的,write 会阻塞循环线程

那为什么写的不是自动调用的协程呢?为什么要调用 write 而不必排水?我能想到两种情况

  1. 您想立即编写关闭
  2. 您必须在消息完成之前缓冲一些数据.

第一个是特殊情况,我认为我们可以有不同的 API.缓冲应该在写函数内部处理,应用程序不应该关心.


让我换个说法.这样做的缺点是什么?python3.8版本是否有效地做到了这一点?

async def awrite(writer, data):writer.write(数据)等待 writer.drain()

注意:drain 文档明确说明如下:

<块引用>

当没有什么可以等待时,drain() 立即返回.


再次阅读答案和链接,我认为这些功能是这样工作的.注意:检查接受的答案以获得更准确的版本.

def write(data):剩余 = socket.try_write(data)如果剩余:_pendingbuffer.append(remaining) # 如果另一端很慢并且我们有很多数据,缓冲区会继续增长异步定义排水():如果 len(_pendingbuffer) 

那么什么时候使用什么:

  1. 当数据不连续时,比如响应 HTTP 请求.我们只需要发送一些数据而不关心它何时到达并且内存不是问题 - 只需使用 write
  2. 同上,但内存是一个问题,使用awrite
  3. 将数据流式传输到大量客户端时(例如某些实时流或大文件).如果数据在每个连接的缓冲区中重复,它肯定会溢出 RAM.在这种情况下,编写一个循环,每次迭代都会获取一个数据块并调用 awrite.如果文件很大,loop.sendfile 最好(如果可用).

解决方案

据我了解,(1) 每次调用 write 时都需要调用 drain.(2)如果不是我猜的,write会阻塞循环线程

两者都不正确,但这种混淆是可以理解的.write() 的工作方式如下:

  • write() 的调用只是将数据存储到缓冲区中,将其留给事件循环以在稍后实际将其写出,而无需程序进一步干预.就应用程序而言,数据在后台写入的速度与另一方能够接收的速度一样快.换句话说,每个 write() 将使用尽可能多的操作系统级写入来安排要传输的数据,这些写入在相应的文件描述符实际可写时发出.所有这一切都会自动发生,甚至无需等待 drain().

  • write() 不是协程,它绝对从不阻塞事件循环.

第二个属性听起来很方便 - 您可以在任何需要的地方调用 write(),甚至可以从不是 async def 的函数中调用 - 但它实际上是一个主要的 write() 的>缺陷.由流 API 公开的写入与接受数据的操作系统完全分离,因此如果您写入数据的速度比对等方读取数据的速度快,则内部缓冲区将继续增长,并且您将拥有一个 内存泄漏手.drain() 解决了这个问题:如果写入缓冲区太大,等待它会暂停协程,并在 os.write() 在后台成功,缓冲区缩小.

您不需要在每次写入后等待 drain(),但您确实需要偶尔等待它,通常在 write() 被调用.例如:

虽然为真:响应 = 等待 peer1.readline()peer2.write(b'<响应>')peer2.write(响应)peer2.write(b'</response>')等待 peer2.drain()

drain() 如果待处理的未写入数据量很小,则立即返回.如果数据超过高阈值,drain() 将挂起调用协程,直到待处理的未写入数据量降至低阈值以下.暂停将导致协程停止从 peer1 读取,这反过来又会导致对等方降低向我们发送数据的速度.这种反馈被称为背压.

<块引用>

缓冲应该在写函数内部处理,应用程序不应该关心.

这几乎是 write() 现在的工作方式 - 它确实处理缓冲并且它让应用程序不管好坏.另请参阅此答案了解更多信息.

<小时>解决问题的编辑部分:<块引用>

再次阅读答案和链接,我认为这些功能是这样工作的.

write() 仍然比那聪明一点.它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止.即使您从不等待 drain(),这也会发生 - 应用程序唯一要做的就是让事件循环运行足够长的时间以将所有内容都写出来.

更正确的 writedrain 伪代码可能如下所示:

类 ToyWriter:def __init__(self):self._buf = bytearray()self._empty = asyncio.Event(True)定义写入(自我,数据):self._buf.extend(数据)loop.add_writer(self._fd, self._do_write)self._empty.clear()def _do_write(self):# 事件循环时自动调用# 文件描述符是可写的,无论是否# 任何人调用drain()而 self._buf:尝试:nwritten = os.write(self._fd, self._buf)除了作为 e 的 OSError:如果 e.errno == errno.EWOULDBLOCK:return # 再次可写后继续增加self._buf = self._buf[nwritten:]self._empty.set()loop.remove_writer(self._fd, self._do_write)异步 def 排水(自我):如果 len(self._buf) >64*1024:等待 self._empty.wait()

实际实现更复杂,因为:

  • 它写在 Twisted 风格的transport/protocol 层,带有自己复杂的 流控制,而不是在os.write之上;
  • drain() 不会真正等到缓冲区为空,而是等到它到达 低水印;
  • _do_write 中引发的 EWOULDBLOCK 以外的异常被存储并在 drain() 中重新引发.

最后一点是另一个调用 drain() 的好理由 - 实际上注意到对等点因为写入失败而消失了.>

From doc:

write(data)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

coroutine drain()

Wait until it is appropriate to resume writing to the stream. Example:

writer.write(data)
await writer.drain()

From what I understand,

  • You need to call drain every time write is called.
  • If not I guess, write will block the loop thread

Then why is write not a coroutine that calls it automatically? Why would one call write without having to drain? I can think of two cases

  1. You want to write and close immediately
  2. You have to buffer some data before the message it is complete.

First one is a special case, I think we can have a different API. Buffering should be handled inside write function and application should not care.


Let me put the question differently. What is the drawback of doing this? Does the python3.8 version effectively do this?

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()

Note: drain doc explicitly states the below:

When there is nothing to wait for, the drain() returns immediately.


Reading the answer and links again, I think the functions work like this. Note: Check accepted answer for more accurate version.

def write(data):
    remaining = socket.try_write(data)
    if remaining:
        _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data

async def drain():
    if len(_pendingbuffer) < BUF_LIMIT:
        return
    await wait_until_other_side_is_up_to_speed()
    assert len(_pendingbuffer) < BUF_LIMIT

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()        

So when to use what:

  1. When the data is not continuous, Like responding to an HTTP request. We just need to send some data and don't care about when it is reached and memory is not a concern - Just use write
  2. Same as above but memory is a concern, use awrite
  3. When streaming data to a large number of clients (e.g. some live stream or a huge file). If the data is duplicated in each of the connection's buffers, it will definitely overflow RAM. In this case, write a loop that takes a chunk of data each iteration and call awrite. In case of a huge file, loop.sendfile is better if available.

解决方案

From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread

Neither is correct, but the confusion is quite understandable. The way write() works is as follows:

  • A call to write() just stashes the data to a buffer, leaving it to the event loop to actually write it out at a later time, and without further intervention by the program. As far as the application is concerned, the data is written in the background as fast as the other side is capable of receiving it. In other words, each write() will schedule its data to be transferred using as many OS-level writes as it takes, with those writes issued when the corresponding file descriptor is actually writable. All this happens automatically, even without ever awaiting drain().

  • write() is not a coroutine, and it absolutely never blocks the event loop.

The second property sounds convenient - you can call write() wherever you need to, even from a function that's not async def - but it's actually a major flaw of write(). Writing as exposed by the streams API is completely decoupled from the OS accepting the data, so if you write data faster than your peer can read it, the internal buffer will keep growing and you'll have a memory leak on your hands. drain() fixes that problem: awaiting it pauses the coroutine if the write buffer is too large, and resumes it again once the os.write()'s performed in the background are successful and the buffer shrinks.

You don't need to await drain() after every write, but you do need to await it occasionally, typically between iterations of a loop in which write() is invoked. For example:

while True:
    response = await peer1.readline()
    peer2.write(b'<response>')
    peer2.write(response)
    peer2.write(b'</response>')
    await peer2.drain()

drain() returns immediately if the amount of pending unwritten data is small. If the data exceeds a high threshold, drain() will suspend the calling coroutine until the amount of pending unwritten data drops beneath a low threshold. The pause will cause the coroutine to stop reading from peer1, which will in turn cause the peer to slow down the rate at which it sends us data. This kind of feedback is referred to as back-pressure.

Buffering should be handled inside write function and application should not care.

That is pretty much how write() works now - it does handle buffering and it lets the application not care, for better or worse. Also see this answer for additional info.


Addressing the edited part of the question:

Reading the answer and links again, I think the the functions work like this.

write() is still a bit smarter than that. It won't try to write only once, it will actually arrange for data to continue to be written until there is no data left to write. This will happen even if you never await drain() - the only thing the application must do is let the event loop run its course for long enough to write everything out.

A more correct pseudo code of write and drain might look like this:

class ToyWriter:
    def __init__(self):
        self._buf = bytearray()
        self._empty = asyncio.Event(True)

    def write(self, data):
        self._buf.extend(data)
        loop.add_writer(self._fd, self._do_write)
        self._empty.clear()

    def _do_write(self):
        # Automatically invoked by the event loop when the
        # file descriptor is writable, regardless of whether
        # anyone calls drain()
        while self._buf:
            try:
                nwritten = os.write(self._fd, self._buf)
            except OSError as e:
                if e.errno == errno.EWOULDBLOCK:
                    return  # continue once we're writable again
                raise
            self._buf = self._buf[nwritten:]
        self._empty.set()
        loop.remove_writer(self._fd, self._do_write)

    async def drain(self):
        if len(self._buf) > 64*1024:
            await self._empty.wait()

The actual implementation is more complicated because:

  • it's written on top of a Twisted-style transport/protocol layer with its own sophisticated flow control, not on top of os.write;
  • drain() doesn't really wait until the buffer is empty, but until it reaches a low watermark;
  • exceptions other than EWOULDBLOCK raised in _do_write are stored and re-raised in drain().

The last point is another good reason to call drain() - to actually notice that the peer is gone by the fact that writing to it is failing.

这篇关于为什么要显式调用 asyncio.StreamWriter.drain?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆