python-如何将C函数实现为可等待的(协程) [英] python - how to implement a C-function as awaitable (coroutine)
问题描述
环境:C和micropython虚拟机中的协作RTOS是任务之一.
Environment: cooperative RTOS in C and micropython virtual machine is one of the tasks.
为使VM不阻止其他RTOS任务,我在RTOS_sleep() ="noreferrer"> vm.c:DISPATCH()
,以便在执行每个字节码后,VM会将控制权交给下一个RTOS任务.
To make the VM not block the other RTOS tasks, I insert RTOS_sleep()
in vm.c:DISPATCH()
so that after every bytecode is executed, the VM relinquishes control to the next RTOS task.
我创建了一个uPy接口,以使用生产者-消费者设计模式从物理数据总线(可能是CAN,SPI,以太网)异步获取数据.
I created a uPy interface to asynchronously obtain data from a physical data bus - could be CAN, SPI, ethernet - using producer-consumer design pattern.
在uPy中的用法:
can_q = CANbus.queue()
message = can_q.get()
在C中的实现是,can_q.get()
不会阻塞RTOS:它轮询C队列,如果未接收到消息,它将调用RTOS_sleep()
使另一个任务有机会填充队列.事情是同步的,因为C队列仅由另一个RTOS任务更新,并且RTOS任务仅在调用RTOS_sleep()
即 cooperative
The implementation in C is such that can_q.get()
does NOT block the RTOS: it polls a C-queue and if message is not received, it calls RTOS_sleep()
to give another task the chance to fill the queue. Things are synchronized because the C-queue is only updated by another RTOS task and RTOS tasks only switch when RTOS_sleep()
is called i.e. cooperative
C实现基本上是:
// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep();
return c_queue_get_message();
尽管Python语句can_q.get()
不会阻止RTOS,但是会阻止uPy脚本.
我想重写它,以便可以与async def
即协程一起使用,并且不阻塞uPy脚本.
Although the Python statement can_q.get()
does not block the RTOS, it does block the uPy script.
I'd like to rewrite it so I can use it with async def
i.e. coroutine and have it not block the uPy script.
不确定语法,但是类似这样:>
Not sure of the syntax but something like this:
can_q = CANbus.queue()
message = await can_q.get()
问题
我如何编写C函数,以便可以在其上await
?
How do I write a C-function so I can await
on it?
我希望使用CPython和micropython答案,但我只接受CPython答案.
I would prefer a CPython and micropython answer but I would accept a CPython-only answer.
推荐答案
注意:此答案涵盖了CPython和asyncio框架.但是,这些概念应适用于其他Python实现以及其他异步框架.
Note: this answer covers CPython and the asyncio framework. The concepts, however, should apply to other Python implementations as well as other async frameworks.
我如何编写C函数,以便可以在其上
await
?
编写可等待结果的C函数的最简单方法是让它返回一个已经制成的等待对象,例如
The simplest way to write a C function whose result can be awaited is by having it return an already made awaitable object, such as an asyncio.Future
. Before returning the Future
, the code must arrange for the future's result to be set by some asynchronous mechanism. All of these coroutine-based approaches assume that your program is running under some event loop that knows how to schedule the coroutines.
但是返回未来并不总是足够的-也许我们想定义一个具有任意数量的悬挂点的对象.返回未来仅暂停一次(如果返回的未来未完成),一旦完成未来就恢复,仅此而已.等效于包含多个await
的async def
的等待对象不能通过返回future来实现,它必须实现协程通常实现的协议.这有点像实现自定义__next__
的迭代器,并且可以代替生成器使用.
But returning a future isn't always enough - maybe we'd like to define an object with an arbitrary number of suspension points. Returning a future suspends only once (if the returned future is not complete), resumes once the future is completed, and that's it. An awaitable object equivalent to an async def
that contains more than one await
cannot be implemented by returning a future, it has to implement a protocol that coroutines normally implement. This is somewhat like an iterator implementing a custom __next__
and be used instead of a generator.
To define our own awaitable type, we can turn to PEP 492, which specifies exactly which objects can be passed to await
. Other than Python functions defined with async def
, user-defined types can make objects awaitable by defining the __await__
special method, which Python/C maps to the tp_as_async.am_await
part of the PyTypeObject
struct.
这意味着在Python/C中,您必须执行以下操作:
What this means is that in Python/C, you must do the following:
- 为
tp_as_async
您的扩展程序类型的字段. - 具有其
am_await
成员指向接受您类型实例并返回实现迭代器协议,即定义tp_iter
(通常定义为PyIter_Self
)和
- specify a non-NULL value for the
tp_as_async
field of your extension type. - have its
am_await
member point to a C function that accepts an instance of your type and returns an instance of another extension type that implements the iterator protocol, i.e. definestp_iter
(trivially defined asPyIter_Self
) andtp_iternext
. - the iterator's
tp_iternext
must advance the coroutine's state machine. Each non-exceptional return fromtp_iternext
corresponds to a suspension, and the finalStopIteration
exception signifies the final return from the coroutine. The return value is stored in thevalue
property ofStopIteration
.
为使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定暂停后何时恢复.由asyncio定义的大多数协程都期望在asyncio事件循环下运行,并且在内部使用asyncio.get_event_loop()
(和/或接受显式的loop
参数)来获取其服务.
For the coroutine to be useful, it must also be able to communicate with the event loop that drives it, so that it can specify when it is to be resumed after it has suspended. Most of coroutines defined by asyncio expect to be running under the asyncio event loop, and internally use asyncio.get_event_loop()
(and/or accept an explicit loop
argument) to obtain its services.
为说明Python/C代码需要实现的内容,让我们考虑用Python async def
表示的简单协程,例如asyncio.sleep()
的等价物:
To illustrate what the Python/C code needs to implement, let's consider simple coroutine expressed as a Python async def
, such as this equivalent of asyncio.sleep()
:
async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
# we get back here after the timeout has elapsed, and
# immediately return
my_sleep
创建一个 Future
,安排它在 n 秒内完成(其结果将被设置),并暂停自身直到将来完成.最后一部分使用await
,其中await x
表示允许x
决定我们现在将暂停还是继续执行".不完整的将来总是决定暂停,而异步Task
协程驱动程序的特殊情况产生了将来无限期地暂停它们并将它们的完成与恢复任务联系起来的期货.其他事件循环(curio等)的挂起机制在细节上可能有所不同,但是其基本思想是相同的:await
是可选的执行挂起.
my_sleep
creates a Future
, arranges for it to complete (its result to become set) in n seconds, and suspends itself until the future completes. The last part uses await
, where await x
means "allow x
to decide whether we will now suspend or keep executing". An incomplete future always decides to suspend, and the asyncio Task
coroutine driver special-cases yielded futures to suspend them indefinitely and connects their completion to resuming the task. Suspension mechanisms of other event loops (curio etc) can differ in details, but the underlying idea is the same: await
is an optional suspension of execution.
要将其转换为C,我们必须摆脱神奇的async def
函数定义以及await
悬挂点.删除async def
非常简单:等效的普通函数只需要返回一个实现__await__
的对象:
To translate this to C, we have to get rid of the magic async def
function definition, as well as of the await
suspension point. Removing the async def
is fairly simple: the equivalent ordinary function simply needs to return an object that implements __await__
:
def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)
my_sleep()
返回的_MySleep
对象的__await__
方法将由await
运算符自动调用,以将 awaitable 对象(任何传递给await
的对象)转换为一个迭代器.该迭代器将用于询问等待的对象是选择暂停还是提供值.就像for o in x
语句调用x.__iter__()
将 iterable x
转换为具体的 iterator 一样.
The __await__
method of the _MySleep
object returned by my_sleep()
will be automatically called by the await
operator to convert an awaitable object (anything passed to await
) to an iterator. This iterator will be used to ask the awaited object whether it chooses to suspend or to provide a value. This is much like how the for o in x
statement calls x.__iter__()
to convert the iterable x
to a concrete iterator.
当返回的迭代器选择暂停时,只需要产生一个值即可.值的含义(如果有的话)将由协程驱动程序解释,通常是事件循环的一部分.当迭代器选择停止执行并从await
返回时,它需要停止迭代.使用生成器作为便捷的迭代器实现,_MySleepIter
看起来像这样:
When the returned iterator chooses to suspend, it simply needs to produce a value. The meaning of the value, if any, will be interpreted by the coroutine driver, typically part of an event loop. When the iterator chooses to stop executing and return from await
, it needs to stop iterating. Using a generator as a convenience iterator implementation, _MySleepIter
would look like this:
def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
# yield from future.__await__()
for x in future.__await__():
yield x
由于await x
映射到yield from x.__await__()
,我们的生成器必须耗尽future.__await__()
返回的迭代器.如果将来未完成,则由Future.__await__
返回的迭代器将产生结果,否则返回未来的结果(我们在此忽略,但yield from
实际上提供了).
As await x
maps to yield from x.__await__()
, our generator must exhaust the iterator returned by future.__await__()
. The iterator returned by Future.__await__
will yield if the future is incomplete, and return the future's result (which we here ignore, but yield from
actually provides) otherwise.
在C中实现my_sleep
的C实现的最终障碍是_MySleepIter
的生成器的使用.幸运的是,任何生成器都可以转换为有状态的迭代器,其__next__
执行该代码段直到下一次等待或返回. __next__
实现生成器代码的状态机版本,其中yield
通过返回值表示,而return
通过升高StopIteration
表示.例如:
The final obstacle for a C implementation of my_sleep
in C is the use of generator for _MySleepIter
. Fortunately, any generator can be translated to a stateful iterator whose __next__
executes the piece of code up to the next await or return. __next__
implements a state machine version of the generator code, where yield
is expressed by returning a value, and return
by raising StopIteration
. For example:
class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self): # an iterator has to define __iter__
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")
翻译为C
以上内容是相当多的键入内容,但可以使用,并且仅使用可以通过本机Python/C函数定义的构造.
Translation to C
The above is quite some typing, but it works, and only uses constructs that can be defined with native Python/C functions.
实际上将这两个类转换为C非常简单,但超出了此答案的范围.
Actually translating the two classes to C quite straightforward, but beyond the scope of this answer.
这篇关于python-如何将C函数实现为可等待的(协程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!