将async-for与if条件打破中间等待的正确方法是什么? [英] What is the correct way to combine async-for with an if condition to break mid await?

查看:63
本文介绍了将async-for与if条件打破中间等待的正确方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我有一个协程正在消耗异步生成器中的项目,那么什么是最佳"?从外部条件终止该循环的方法?

If I have a coroutine that's consuming items from an async generator what is the "best" way to terminate that loop from an external condition?

考虑这个

while not self.shutdown_event.is_set():
    async with self.external_lib_client as client:
        async for message in client:
            if self.shutdown_event.is_set():
                break
            await self.handle(message)

如果我设置了 shutdown_event ,它将退出while循环,但是直到下一个 message async for 处理了之后环形.构造 a 迭代器的正确方法是什么,以使其在满足条件的情况下会短路,从而导致结果短路?

If I set shutdown_event it will break out of the while loop, but not until the next message has been handled by the async for loop. What is the correct way to structure the async for iterator such that it can short circuit if a condition has been met between it yielding results?

是否存在添加超时的标准方法?

Is there a standard way to add a Timeout?

推荐答案

一种方法是将迭代移至 async def 并使用取消:

One way would be to move the iteration to an async def and use cancelation:

async def iterate(client):
    async for message in client:
        # shield() because we want cancelation to cancel retrieval
        # of the next message, not ongoing handling of a message
        await asyncio.shield(self.handle(message))

async with self.external_lib_client as client:
    iter_task = asyncio.create_task(iterate(client))
    shutdown_task = asyncio.create_task(self.shutdown_event.wait())
    await asyncio.wait([iter_task, shutdown_task],
                       return_when=asyncio.FIRST_COMPLETED)
    if iter_task.done():
        # iteration has completed, access result to propagate the
        # exception if one was raised
        iter_task.result()
        shutdown_task.cancel()
    else:
        # shutdown was requested, cancel iteration
        iter_task.cancel()

另一种方法是将 shutdown_event 转换为单次异步流,并使用 aiostream 来同时监视两者.这样,当发出关闭事件信号时, for 循环会获得一个对象,并且可以中断循环而无需费心等待下一条消息:

Another way would be to turn shutdown_event into a one-shot async stream and use aiostream to monitor both. That way the for loop gets an object when the shutdown event is signaled and can break out of the loop without bothering to finish waiting for the next message:

# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())

async with self.external_lib_client as client, \
        aiostream.stream.merge(done_stream, client).stream() as stream:
    async for message in stream:
        # the merged stream will provide a bogus value (whatever
        # `shutdown_event.wait()` returned) when the event is set,
        # so check that before using `message`:
        if self.shutdown_event.is_set():
            break
        await self.handle(message)

注意:由于问题中的代码不可运行,因此上述示例未经测试.

Note: since the code in the question is not runnable, the above examples are untested.

这篇关于将async-for与if条件打破中间等待的正确方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆