在 asyncio 中的 KeyboardInterrupt 后等待任务完成 [英] Waiting for a task to complete after KeyboardInterrupt in asyncio
问题描述
我试图了解 asyncio
是如何工作的.在我的场景中,客户端与服务器建立 tcp
连接,发送登录字符串,如果经过身份验证 - 接收字符流.最后在 KeyboardInterrupt
上将 logoff
字符串发送到服务器并顺利断开连接.
I'm trying to understand how does asyncio
work. In my scenario client makes a tcp
connection to the server, sends a login string, if authenticated - receives a stream of chars. Finally on KeyboardInterrupt
sends logoff
string to the server and hapilly disconnects.
目前我被困在最后一部分,因为我的注销方法/任务在它有机会完成之前就被破坏了.
Currently I'm stuck on the final part as my logoff method/task is destroyed before it has a chance to complete.
^CTask was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "tst.py", line 101, in <module>
client.login()
File "tst.py", line 29, in login
logoff_tsk = self.loop.create_task(self.logoff())
task: <Task pending coro=<logoff() running at tst.py:49> cb=[myClass._shutdown()] created at tst.py:29>
以下是产生此错误的代码:
Below is the code which produces this error:
from functools import partial
import asyncio
class myClass:
def __init__(self, *a, **kw):
self.transport = None
self.protocol = None
self.usr = str(kw.get("usr", ""))
self.pwd = str(kw.get("pwd", ""))
self.host = str(kw.get("host", "")) or "127.0.0.2"
self.port = int(kw.get("port", 0)) or 5038
self.loop = asyncio.get_event_loop()
self.loop.set_debug(enabled=True)
def reactor(self, recv):
print("## ~/~ From reactor: {!r}".format(recv.decode()))
def login(self):
connection_tsk = self.loop.create_task(self.loop.create_connection(
partial(
myProtocol,
reactor_func=self.reactor),
host=self.host,
port=self.port))
connection_tsk.add_done_callback(self.set_transport_protocol)
try:
self.loop.run_forever()
except KeyboardInterrupt:
logoff_tsk = self.loop.create_task(self.logoff())
logoff_tsk.add_done_callback(self._shutdown)
def set_transport_protocol(self, fut):
print("AmiCtl.set_transport_protocol")
transport, protocol = fut.result()
self.transport = transport
self.protocol = protocol
self.loop.call_soon(self._login)
def _login(self):
login_string = self.cmd("Login")
self.loop.create_task(self.transmit(login_string))
@asyncio.coroutine
def transmit(self, cmd):
if self.transport:
print("## ~/~ Sending data: {!r}".format(cmd))
self.transport.write(cmd)
@asyncio.coroutine
def logoff(self):
command = self.cmd("Logoff")
yield from asyncio.shield(self.transmit(command))
def _shutdown(self):
if self.transport:
self.transport.close()
self.loop.stop()
self.loop.close()
print("\n{!r}".format(self.loop))
def cmd(self, action):
"""
Produce login/logoff string.
"""
class myProtocol(asyncio.Protocol):
def __init__(self, reactor_func=None):
self.reactor_func = reactor_func
self.loop = asyncio.get_event_loop()
def connection_made(self, transport):
self.transport = transport
peername = transport.get_extra_info("peername")
print("## ~/~ Connection made: {peer}".format(peer=peername))
def data_received(self, data):
if callable(self.reactor_func):
self.reactor_func(data)
def connection_lost(self, exc):
print("## ~/~ Lost connection to the server!!")
self.loop.stop()
client = myClass(usr="my_usr", pwd="my_pwd")
client.login()
我该如何改进/修复我的代码?
How can I improve/fix my code please?
推荐答案
我不会参考你的代码,因为我认为你的问题是一个 xy 问题.(http://xyproblem.info),但我会尝试以更通用的方式回答.我读到的是您在询问如何正确关闭 asyncio 应用程序.
I will not refer to you code as I think your question is a xy-question. (http://xyproblem.info), But I will try to answer in a more generic way. What I am reading is that you are asking how to correctly close down an asyncio application.
我已经建立了一个小例子,我认为通过一些解释可以让你走上正轨.阅读下面的代码,我希望它可以与您的用例相关,我将在下面解释.
I have set up a small example that I think with some explanation will get you on the right track. Read the code below, my hope is that it can relate to your use case and I will explain below.
import asyncio
import signal
loop = asyncio.get_event_loop()
class Connector:
def __init__(self):
self.closing = False
self.closed = asyncio.Future()
task = loop.create_task(self.connection_with_client())
task.add_done_callback(self.closed.set_result)
async def connection_with_client(self):
while not self.closing:
print('Read/write to open connection')
await asyncio.sleep(1)
print('I will now close connection')
await asyncio.sleep(1)
conn = Connector()
def stop(loop):
conn.closing = True
print("from here I will wait until connection_with_client has finished")
conn.closed.add_done_callback(lambda _: loop.stop())
loop.add_signal_handler(signal.SIGINT, stop, loop)
loop.run_forever()
loop.close()
您要求的实际上并非微不足道,管理执行 asyncio 最困难的事情之一就是正确关闭 asyncio 应用程序.
What you are asking for is actually not trivial, one of the hardest things to manage doing asyncio is correctly closing an asyncio application.
要进行托管关闭,您必须有一个协程来处理关闭并将未来设置为完成,当它确保一切都关闭时.在我的例子中它是 task.add_done_callback(self.closed.set_result)
但这个未来可以通过其他方式设置.
To do managed closing you have to have a coroutine in place that will handle shutdown and set af future as done, when it has made sure that everything is shutdown. In my example it is task.add_done_callback(self.closed.set_result)
but this future could be set in other ways.
其次,您必须添加一个信号处理程序,该处理程序将运行一个非异步(正常)函数并安排一个回调,在您的关闭未来"完成时触发循环关闭/停止.
Second, you have to add a signal handler that will run a non-asynchronously (normal) function and schedule a callback, that triggers the loop to close/stop when your 'close future' is done.
看看我的代码并玩弄它,直到你理解流程.我不怪你问,在我看来,做 asyncio 最困难的事情之一是跟踪松散"的协程,这将导致不正常关闭.
Take a look at my code and toy around with it until you understand the flow. I dont blame you for asking, in my opinion one of the hardest things doing asyncio is to keep track of 'loose' coroutines, that will result in unclean shutdown.
当我第一次做这个练习"并需要了解原理时,我浏览了https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95如果您花时间阅读代码,这些人会以非常漂亮的方式处理您的确切问题,我知道代码很复杂,但是请喝杯咖啡并遵循方法调用,直到它有意义为止.
When I did this 'exercise' my first time and needed to understand the principles I went though the code of https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95 If you take the time to read the code these guys handle your exact problem in a very beautiful way, I know the code is a but complex, but take a mug of coffee and follow the method calls around until it makes sense.
我希望这对您有所帮助,如果我不清楚,请发布更多详细信息或评论.并花时间了解这个话题(asycnio 关闭的东西).当您使用 asyncio 掌握关机管理时,您就不再是 asyncio-padawan 了 ;)
I hope this helps you, post more details or comments if I am unclear. And take you time to understand this topic (the asycnio shutdown things). When you master shutdown management with asyncio you are not a asyncio-padawan anymore ;)
您可能会认为,关闭干净需要大量代码,但据我所知,这是实现此目的的方法(对于更大、更复杂的应用程序来说,这或多或少是唯一的方法).
You may think, Thats alot of code to shutdown clean, but to my understanding, this is the way to do it(and more or less the only way for bigger and more complex applications).
祝你好运.
这篇关于在 asyncio 中的 KeyboardInterrupt 后等待任务完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!