asyncio 实际上是如何工作的? [英] How does asyncio actually work?

查看:63
本文介绍了asyncio 实际上是如何工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题的动机是我的另一个问题:如何在 cdef 中等待?

网络上有大量关于 asyncio 的文章和博客文章,但它们都非常肤浅.我找不到任何关于 asyncio 是如何实际实现的信息,以及什么使 I/O 异步.我试图阅读源代码,但它有数千行不是最高级的 C 代码,其中很多涉及辅助对象,但最关键的是,很难在 Python 语法和它将翻译的 C 代码之间建立联系

Asycnio 自己的文档更没有帮助.那里没有关于它是如何工作的信息,只有一些关于如何使用它的指导方针,这些指导方针有时也会产生误导/写得很差.

我熟悉 Go 的协程实现,有点希望 Python 也能做同样的事情.如果是这样的话,我在上面链接的帖子中提出的代码就会起作用.既然没有,我现在正试图找出原因.到目前为止,我的最佳猜测如下,请纠正我的错误:

  1. async def foo(): ... 形式的过程定义实际上被解释为继承 coroutine 的类的方法.
  2. 也许,async def 实际上被await 语句拆分成多个方法,其中调用这些方法的对象能够跟踪它的进度到目前为止执行完毕.
  3. 如果上述情况属实,那么从本质上讲,协程的执行归结为某个全局管理器(循环?)调用协程对象的方法.
  4. 全局管理器以某种方式(如何?)知道何时由 Python(仅?)代码执行 I/O 操作,并且能够在当前执行方法放弃控制(命中在 await 语句上).

换句话说,这是我尝试将一些 asyncio 语法脱糖"为更易于理解的内容:

async def coro(name):打印('之前',名称)等待 asyncio.sleep()打印('之后',名称)asyncio.gather(coro('first'), coro('second'))# 翻译自 async def coro(name)类Coro(协程):定义之前(自我,名称):打印('之前',名称)def after(self, name):打印('之后',名称)def __init__(self, name):self.name = 姓名self.parts = self.before, self.afterself.pos = 0定义 __call__():self.parts[self.pos](self.name)self.pos += 1定义完成(自我):返回 self.pos == len(self.parts)# 翻译自 asyncio.gather()类 AsyncIOManager:定义收集(* coros):而不是每个(c.done() for c in coros):coro = random.choice(coros)冠()

我的猜测是否应该被证明是正确的:那么我有一个问题.在这种情况下,I/O 实际上是如何发生的?在单独的线程中?整个解释器是否挂起并且 I/O 发生在解释器之外?I/O 究竟是什么意思?如果我的 python 过程调用 C open() 过程,然后它又向内核发送中断,放弃对它的控制,那么 Python 解释器如何知道这一点并能够继续运行其他一些代码,而内核代码执行实际的 I/O,直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何意识到这种情况发生?

解决方案

asyncio 是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果您已经知道其中任何一个,请跳过这些.

发电机

生成器是允许我们暂停 Python 函数执行的对象.用户策划的生成器使用关键字 yield.通过创建一个包含 yield 关键字的普通函数,我们将该函数变成了一个生成器:

<预><代码>>>>定义测试():...产量 1... 产量 2...>>>gen = 测试()>>>下一代)1>>>下一代)2>>>下一代)回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中.停止迭代

如您所见,调用 next() 在生成器上导致解释器加载测试的框架,并返回 yield ed 值.再次调用 next(),使帧再次加载到解释器堆栈中,并继续 yield 输入另一个值.

第三次调用 next() 时,我们的生成器已经完成,StopIteration 被抛出.

与生成器通信

生成器的一个鲜为人知的特性是,您可以使用两种方法与它们通信:send()throw().

<预><代码>>>>定义测试():... val = 产量 1... 打印(val)... 产量 2...产量 3...>>>gen = 测试()>>>下一代)1>>>gen.send(abc")美国广播公司2>>>gen.throw(异常())回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中.测试中的文件

调用 gen.send() 后,该值将作为 yield 关键字的返回值传递.

gen.throw() 另一方面,允许在生成器内部抛出异常,并在调用 yield 的同一位置引发异常.

从生成器返回值

从生成器返回一个值,导致该值被放入 StopIteration 异常中.稍后我们可以从异常中恢复值并根据需要使用它.

<预><代码>>>>定义测试():...产量 1...返回abc";...>>>gen = 测试()>>>下一代)1>>>尝试:... 下一代)... 除了 StopIteration 作为 exc:... 打印(不包括值)...美国广播公司

看哪,一个新关键字:yield from

Python 3.4 添加了一个新关键字:收益来自.该关键字允许我们做的是将任何 next()send()throw() 传递到最内层嵌套发电机.如果内部生成器返回一个值,它也是yield from的返回值:

<预><代码>>>>定义内部():... 内部结果 = 产量 2... 打印('内部',内部结果)...返回 3...>>>定义外部():...产量 1... val = 来自内部()的收益...打印('外',val)...产量 4...>>>gen = 外()>>>下一代)1>>>next(gen) # 自动进入inner()2>>>gen.send(abc")内 abc外34

我写了一篇文章来进一步阐述这个话题.

综合起来

在 Python 3.4 中引入新关键字 yield from 后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内部到外部来回传递- 大多数发电机.这为生成器带来了新的含义——协程.

协程是可以在运行时停止和恢复的函数.在 Python 中,它们是使用 async 定义的def 关键字.就像生成器一样,它们也使用自己的 yield from 形式,即 等待.在 Python 3.5 中引入 asyncawait 之前,我们以完全相同的方式创建生成器(使用 yield from 而不是 >等待).

异步定义内部():返回 1异步定义外部():等待内部()

就像每个实现 __iter__() 方法的迭代器或生成器一样,协程实现了 __await__() 允许它们每次都继续 await coro代码>被调用.

序列图://docs.python.org/3.5/library/asyncio-task.html#example-chain-coroutines" rel="noreferrer">Python 文档,您应该查看.

在 asyncio 中,除了协程函数之外,我们还有两个重要的对象:tasksfutures.

期货

Future 是实现了 __await__() 方法的对象,它们的工作是保存某个状态和结果.状态可以是以下之一:

  1. PENDING - 未来没有任何结果或异常集.
  2. CANCELLED - 使用 fut.cancel()
  3. 取消了未来
  4. FINISHED - 通过使用 fut.set_result() 或使用 fut.set_exception()

结果,就像你猜的那样,可以是一个将被返回的 Python 对象,也可以是一个可能引发的异常.

future 对象的另一个重要特性是它们包含一个名为 add_done_callback().此方法允许在任务完成后立即调用函数 - 无论是引发异常还是完成.

任务

Task 对象是特殊的 Futures,它包裹着协程,并与最内层和最外层的协程进行通信.每次协程await一个future,future都会一直传回任务(就像yield from),然后任务接收它.

接下来,任务将自己绑定到未来.它通过在未来调用 add_done_callback() 来实现.从现在开始,如果未来将通过取消、传递异常或作为结果传递 Python 对象来完成,则任务的回调将被调用,并且它将恢复存在.

异步

我们必须回答的最后一个紧迫问题是 - IO 是如何实现的?

在 asyncio 的深处,我们有一个事件循环.任务的事件循环.事件循环的工作是在任务准备好时调用它们,并将所有这些工作协调到一台工作机器中.

事件循环的 IO 部分建立在一个名为 选择.Select 是一个阻塞函数,由底层操作系统实现,它允许等待套接字的传入或传出数据.收到数据后唤醒,返回接收到数据的套接字,或准备写入的套接字.

当您尝试通过 asyncio 通过套接字接收或发送数据时,下面实际发生的是首先检查套接字是否有任何可以立即读取或发送的数据.如果它的 .send() 缓冲区已满,或者 .recv() 缓冲区为空,则将套接字注册到 select 函数(通过简单地将它添加到列表之一,rlist 用于 recvwlist 用于 send) 和适当的功能await一个新创建的 future 对象,绑定到那个套接字.

当所有可用任务都在等待 future 时,事件循环调用 select 并等待.当其中一个套接字有传入数据,或其 send 缓冲区耗尽时,asyncio 会检查绑定到该套接字的未来对象,并将其设置为完成.

现在所有的魔法都发生了.未来设置为完成,之前用 add_done_callback() 添加自己的任务起死回生,并在恢复内部的协程上调用 .send()- 大多数协程(因为 await 链),您从附近的缓冲区读取新接收到的数据.

再次方法链,在recv()的情况下:

  1. select.select 等待.
  2. 一个准备好的套接字,返回数据.
  3. 来自套接字的数据被移动到缓冲区中.
  4. future.set_result() 被调用.
  5. 使用 add_done_callback() 添加自身的任务现在被唤醒.
  6. 任务在协程上调用 .send(),协程一直进入最内部的协程并唤醒它.
  7. 正在从缓冲区读取数据并返回给我们不起眼的用户.

总而言之,asyncio 使用生成器功能,允许暂停和恢复功能.它使用 yield from 功能,允许将数据从最里面的生成器到最外面的生成器来回传递.它使用所有这些来在等待 IO 完成时停止函数执行(通过使用操作系统 select 函数).

最好的呢?当一个函数暂停时,另一个函数可能会运行并与微妙的结构交错,这就是 asyncio.

This question is motivated by my another question: How to await in cdef?

There are tons of articles and blog posts on the web about asyncio, but they are all very superficial. I couldn't find any information about how asyncio is actually implemented, and what makes I/O asynchronous. I was trying to read the source code, but it's thousands of lines of not the highest grade C code, a lot of which deals with auxiliary objects, but most crucially, it is hard to connect between Python syntax and what C code it would translate into.

Asycnio's own documentation is even less helpful. There's no information there about how it works, only some guidelines about how to use it, which are also sometimes misleading / very poorly written.

I'm familiar with Go's implementation of coroutines, and was kind of hoping that Python did the same thing. If that was the case, the code I came up in the post linked above would have worked. Since it didn't, I'm now trying to figure out why. My best guess so far is as follows, please correct me where I'm wrong:

  1. Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine.
  2. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far.
  3. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?).
  4. The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

In other words, here's my attempt at "desugaring" of some asyncio syntax into something more understandable:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

解决方案

How does asyncio work?

Before answering this question we need to understand a few base terms, skip these if you already know any of them.

Generators

Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield. By creating a normal function containing the yield keyword, we turn that function into a generator:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

As you can see, calling next() on the generator causes the interpreter to load test's frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

By the third time next() is called, our generator was finished, and StopIteration was thrown.

Communicating with a generator

A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Upon calling gen.send(), the value is passed as a return value from the yield keyword.

gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

Returning values from generators

Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Behold, a new keyword: yield from

Python 3.4 came with the addition of a new keyword: yield from. What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

I've written an article to further elaborate on this topic.

Putting it all together

Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.

Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

async def inner():
    return 1

async def outer():
    await inner()

Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

There's a nice sequence diagram inside the Python docs that you should check out.

In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

Futures

Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

  1. PENDING - future does not have any result or exception set.
  2. CANCELLED - future was cancelled using fut.cancel()
  3. FINISHED - future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.

Tasks

Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task's callback will be called, and it will rise back up to existence.

Asyncio

The final burning question we must answer is - how is the IO implemented?

Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop's job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon receiving data it wakes up, and returns the sockets which received data, or the sockets which are ready for writing.

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

Method chain again, in case of recv():

  1. select.select waits.
  2. A ready socket, with data is returned.
  3. Data from the socket is moved into a buffer.
  4. future.set_result() is called.
  5. Task that added itself with add_done_callback() is now woken up.
  6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
  7. Data is being read from the buffer and returned to our humble user.

In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it's waiting for IO to complete (by using the OS select function).

And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.

这篇关于asyncio 实际上是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆