AWS IoT Python SDK和异步 [英] AWS IoT Python SDK and asyncio

查看:132
本文介绍了AWS IoT Python SDK和异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用AWS IoT MQTT服务.我正在使用> https://github.com/aws/aws- iot-device-sdk-python 当前.

I need to use AWS IoT MQTT service. I am doing some experimenting with https://github.com/aws/aws-iot-device-sdk-python currently.

我的应用程序将使用websockets与另一个服务进行通信,然后发布/订阅MQTT主题以转发/接收消息.

My application will be using websockets to communicate with another service, and then publish / subscribe to MQTT topics to forward / receive messages.

该库是否有可能阻止代码执行?我仍然试图绕过asyncio,并且不确定我应该注意什么.我怎么知道它是否会引起问题?

Is it likely that this library will be blocking code execution? I still try to get my head around asyncio, and not sure what things I should be looking out for. How do I know if it will cause problems?

我相信我只需要使用这是我拥有的工作代码的摘录:

This is an extract from the working code I have:

class AWSIoTClient:

    def __init__():
        ...
        self.client = AWSIoTMQTTClient(...)

    def subscribe(self, callback):
        self.client.subscribe(f'{self.TOPIC}/subscribe/', 0, callback)

    def publish(self, message):
        self.client.publish(self.TOPIC, message, 0)


class MyWSProtocol(WebSocketClientProtocol):

    def set_aws_client(self, client: AWSIoTClient):
        client.subscribe(self.customCallback)
        self.client = client

    def customCallback(self, client, userdata, message):
        # This will be called when we send message from AWS
        if message.payload:
            message = json.loads(message.payload.decode('utf-8').replace("'", '"'))
            message['id'] = self.next_id()
            self.sendMessage(json.dumps(message).encode('utf-8'))

    def onMessage(self, payload, isBinary):
        message = json.loads(payload)

        # This will forward message to AWS
        self.client.publish(str(payload))

推荐答案

该库是否有可能阻止代码执行?

Is it likely that this library will be blocking code execution?

我怎么知道它是否会引起问题?

How do I know if it will cause problems?

您不应允许在任何协程中使用长时间运行的阻塞(同步)代码.这将导致阻塞您的全局事件循环,并进一步阻塞您在所有地方的所有协程.

You should not allow to having long-running blocking (synchronous) code inside any of your coroutines. It'll lead to blocking your global event loop and further blocking all of your coroutines everywhere.

async def main():
    await asyncio.sleep(3)  # async sleeping, it's ok

    time.sleep(3)           # synchronous sleeping, this freezes event loop 
                            # and all coroutines for 3 seconds, 
                            # you should avoid it!

    await asyncio.sleep(3)  # async sleeping, it's ok

如果您需要在协程内部运行阻塞代码,则应在执行程序中执行(在此处阅读有关它的信息).

If you need to run blocking code inside coroutine you should do it in executor (read here about it).

编写协程时应牢记这一点,但是如果启用

You should keep it in mind when you writing coroutines, but usually asyncio will warn you about this error if you'll enable debug mode:

import asyncio
import time


async def main():
    await asyncio.sleep(3)
    time.sleep(3)
    await asyncio.sleep(3)


loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
try:
    loop.run_until_complete(main())
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

您会看到警告:

Executing <Handle <TaskWakeupMethWrapper object at 0x000002063C2521F8>(<Future finis...events.py:275>) created at C:\Users\gmn\AppData\Local\Programs\Python\Python36\Lib\asyncio\futures.py:348> took 3.000 seconds

这篇关于AWS IoT Python SDK和异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆