在 asyncio 中的 KeyboardInterrupt 后等待任务完成 [英] Waiting for a task to complete after KeyboardInterrupt in asyncio

查看:99
本文介绍了在 asyncio 中的 KeyboardInterrupt 后等待任务完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解 asyncio 是如何工作的.在我的场景中,客户端与服务器建立 tcp 连接,发送登录字符串,如果经过身份验证 - 接收字符流.最后在 KeyboardInterrupt 上将 logoff 字符串发送到服务器并顺利断开连接.

I'm trying to understand how does asyncio work. In my scenario client makes a tcp connection to the server, sends a login string, if authenticated - receives a stream of chars. Finally on KeyboardInterrupt sends logoff string to the server and hapilly disconnects.

目前我被困在最后一部分,因为我的注销方法/任务在它有机会完成之前就被破坏了.

Currently I'm stuck on the final part as my logoff method/task is destroyed before it has a chance to complete.

^CTask was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "tst.py", line 101, in <module>
    client.login()
  File "tst.py", line 29, in login
    logoff_tsk = self.loop.create_task(self.logoff())
task: <Task pending coro=<logoff() running at tst.py:49> cb=[myClass._shutdown()] created at tst.py:29>

以下是产生此错误的代码:

Below is the code which produces this error:

from functools import partial
import asyncio

class myClass:
    def __init__(self, *a, **kw):
        self.transport = None
        self.protocol = None
        self.usr = str(kw.get("usr", ""))
        self.pwd = str(kw.get("pwd", ""))
        self.host = str(kw.get("host", "")) or "127.0.0.2"
        self.port = int(kw.get("port", 0)) or 5038
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(enabled=True)

    def reactor(self, recv):
        print("## ~/~ From reactor: {!r}".format(recv.decode()))

    def login(self):
        connection_tsk = self.loop.create_task(self.loop.create_connection(
            partial(
                myProtocol,
                reactor_func=self.reactor),
            host=self.host,
            port=self.port))
        connection_tsk.add_done_callback(self.set_transport_protocol)
        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            logoff_tsk = self.loop.create_task(self.logoff())
            logoff_tsk.add_done_callback(self._shutdown)

    def set_transport_protocol(self, fut):
        print("AmiCtl.set_transport_protocol")
        transport, protocol = fut.result()
        self.transport = transport
        self.protocol = protocol
        self.loop.call_soon(self._login)

    def _login(self):
        login_string = self.cmd("Login")
        self.loop.create_task(self.transmit(login_string))

    @asyncio.coroutine
    def transmit(self, cmd):
        if self.transport:
            print("## ~/~ Sending data: {!r}".format(cmd))
            self.transport.write(cmd)

    @asyncio.coroutine
    def logoff(self):
        command = self.cmd("Logoff")
        yield from asyncio.shield(self.transmit(command))

    def _shutdown(self):
        if self.transport:
            self.transport.close()
        self.loop.stop()
        self.loop.close()
        print("\n{!r}".format(self.loop))

    def cmd(self, action):
        """
        Produce login/logoff string.
        """

class myProtocol(asyncio.Protocol):
    def __init__(self, reactor_func=None):
        self.reactor_func = reactor_func
        self.loop = asyncio.get_event_loop()

    def connection_made(self, transport):
        self.transport = transport
        peername = transport.get_extra_info("peername")
        print("## ~/~ Connection made: {peer}".format(peer=peername))

    def data_received(self, data):
        if callable(self.reactor_func):
            self.reactor_func(data)

    def connection_lost(self, exc):
        print("## ~/~ Lost connection to the server!!")
        self.loop.stop()


client = myClass(usr="my_usr", pwd="my_pwd")
client.login()

我该如何改进/修复我的代码?

How can I improve/fix my code please?

推荐答案

我不会参考你的代码,因为我认为你的问题是一个 xy 问题.(http://xyproblem.info),但我会尝试以更通用的方式回答.我读到的是您在询问如何正确关闭 asyncio 应用程序.

I will not refer to you code as I think your question is a xy-question. (http://xyproblem.info), But I will try to answer in a more generic way. What I am reading is that you are asking how to correctly close down an asyncio application.

我已经建立了一个小例子,我认为通过一些解释可以让你走上正轨.阅读下面的代码,我希望它可以与您的用例相关,我将在下面解释.

I have set up a small example that I think with some explanation will get you on the right track. Read the code below, my hope is that it can relate to your use case and I will explain below.

import asyncio
import signal

loop = asyncio.get_event_loop()


class Connector:

    def __init__(self):
        self.closing = False
        self.closed = asyncio.Future()
        task = loop.create_task(self.connection_with_client())
        task.add_done_callback(self.closed.set_result)


    async def connection_with_client(self):
        while not self.closing:
            print('Read/write to open connection')
            await asyncio.sleep(1)

        print('I will now close connection')
        await asyncio.sleep(1)


conn = Connector()


def stop(loop):
    conn.closing = True
    print("from here I will wait until connection_with_client has finished")
    conn.closed.add_done_callback(lambda _: loop.stop())

loop.add_signal_handler(signal.SIGINT, stop, loop)
loop.run_forever()
loop.close()

您要求的实际上并非微不足道,管理执行 asyncio 最困难的事情之一就是正确关闭 asyncio 应用程序.

What you are asking for is actually not trivial, one of the hardest things to manage doing asyncio is correctly closing an asyncio application.

要进行托管关闭,您必须有一个协程来处理关闭并将未来设置为完成,当它确保一切都关闭时.在我的例子中它是 task.add_done_callback(self.closed.set_result) 但这个未来可以通过其他方式设置.

To do managed closing you have to have a coroutine in place that will handle shutdown and set af future as done, when it has made sure that everything is shutdown. In my example it is task.add_done_callback(self.closed.set_result) but this future could be set in other ways.

其次,您必须添加一个信号处理程序,该处理程序将运行一个非异步(正常)函数并安排一个回调,在您的关闭未来"完成时触发循环关闭/停止.

Second, you have to add a signal handler that will run a non-asynchronously (normal) function and schedule a callback, that triggers the loop to close/stop when your 'close future' is done.

看看我的代码并玩弄它,直到你理解流程.我不怪你问,在我看来,做 asyncio 最困难的事情之一是跟踪松散"的协程,这将导致不正常关闭.

Take a look at my code and toy around with it until you understand the flow. I dont blame you for asking, in my opinion one of the hardest things doing asyncio is to keep track of 'loose' coroutines, that will result in unclean shutdown.

当我第一次做这个练习"并需要了解原理时,我浏览了https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95如果您花时间阅读代码,这些人会以非常漂亮的方式处理您的确切问题,我知道代码很复杂,但是请喝杯咖啡并遵循方法调用,直到它有意义为止.

When I did this 'exercise' my first time and needed to understand the principles I went though the code of https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95 If you take the time to read the code these guys handle your exact problem in a very beautiful way, I know the code is a but complex, but take a mug of coffee and follow the method calls around until it makes sense.

我希望这对您有所帮助,如果我不清楚,请发布更多详细信息或评论.并花时间了解这个话题(asycnio 关闭的东西).当您使用 asyncio 掌握关​​机管理时,您就不再是 asyncio-padawan 了 ;)

I hope this helps you, post more details or comments if I am unclear. And take you time to understand this topic (the asycnio shutdown things). When you master shutdown management with asyncio you are not a asyncio-padawan anymore ;)

您可能会认为,关闭干净需要大量代码,但据我所知,这是实现此目的的方法(对于更大、更复杂的应用程序来说,这或多或少是唯一的方法).

You may think, Thats alot of code to shutdown clean, but to my understanding, this is the way to do it(and more or less the only way for bigger and more complex applications).

祝你好运.

这篇关于在 asyncio 中的 KeyboardInterrupt 后等待任务完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆