带有异步的pyzmq REQ/REP等待变量 [英] pyzmq REQ/REP with asyncio await for variable

查看:108
本文介绍了带有异步的pyzmq REQ/REP等待变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我第一次使用python中的asyncio进行游戏,并尝试将其与ZMQ结合使用.

I'm playing for the first time with asyncio in python and trying to combine it with ZMQ.

基本上,我的问题是我有一个REP/REQ系统,它位于 async def 中,具有我需要等待的功能.值如何不更新.这是一段代码来说明这一点:

Basically my issue is that I have a REP/REQ system, in an async def with a function I need to await. how the value is not updated. Here's a snippet of the code to illustrate that:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

我将此对象发送给一个类,并通过此函数将其重新获取

I send this object to a class and get it back in this function

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7

,使用pymata_express读取超声波传感器.如果我评论 line.2 line.4 ,我会得到 i .如果我评论 line.1 line.5 print(value) sonar_read 打印正确的值.但是,当我按如下所示运行它时, value 不会更新.

the sonar_read, is using pymata_express to read an ultrasonic sensor. If I comment line.2 and line.4 I get the right value for i. If I comment line.1 and line.5 the print(value) prints the correct value from sonar_read. However, when I run it as shown here, the value is not updated.

我想念什么吗?


编辑有关行注释的类型.我的意思是,如果我只读声纳并打印该值.它工作正常.如果我只使用 .recv() .send(json.dumps(i).encode()),它就可以工作.但是,如果我尝试从声纳发送值.它锁定到给定的 ,该值不会更新

EDIT :
Edited a type regarding the line comments. What I meant is that if I only read the sonar and print the value. It works fine. If I only .recv() and .send(json.dumps(i).encode()), it works. But if I try to send the value from the sonar. It locks to a given value which is not updated

(对Alan Yorinks的答复):这是MWE,它考虑您发送的有关类中 zmq 声明的内容.它来自 pymata_express 示例concurrent_tasks.py

EDIT2 : (answer to Alan Yorinks): here is the MWE, it considers what you sent regarding the declaration of zmq in the class. It is taken from the pymata_express example concurrent_tasks.py

要重现该错误,请在两个不同的终端中运行这两个脚本.您将需要一个安装有 Frimata_express 的arduino板.如果一切顺利 PART A.应该只在 mve_req.py 端吐出相同的值.您可以编辑不同的块(PARTS A,B或C)以查看其行为.

To reproduce the error, run these two scripts in two different terminals. You will need an arduino board with Frimata_express installed. If all runs well, PART A. should only spit out the same value on the mve_req.py end. You may edit the diffrent blocks (PARTS A, B or C) to see the behaviour.

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

推荐答案

我完全是 python-banyan. OP要求我发布此解决方案,因此这并不意味着是一个无耻的插件.

In full disclosure, I am the author of pymata-express and python-banyan. The OP requested that I post this solution, so this is not meant to be a shameless plug.

自从异步3在Python 3中首次引入以来,我一直在使用它进行开发.当异步代码有效时,异步(IMHO)可以简化并发性和代码.但是,当出现问题时,调试和了解问题的原因可能会令人沮丧.

I have been developing with asyncio since it was first introduced in Python 3. When asyncio code works, asyncio (IMHO) can simplify concurrency and the code. However, when things go awry, it can be frustrating to debug and understand the cause of the issues.

我会提前致歉,因为这可能会有点冗长,但是我需要提供一些背景信息,以使该示例看起来不会像是一些随意的代码.

I apologize ahead of time, since this may be a little lengthy, but I need to provide some background information so that the example will not seem like some random bit of code.

开发了python-banyan框架,以提供线程,多处理和异步的替代方法.简而言之,Banyan应用程序由目标明确的小型可执行文件组成,这些可执行文件使用通过LAN共享的协议消息相互通信.它的核心使用Zeromq.它的设计目的不是让流量通过WAN传输,而是将LAN用作软件背板".在某些方面,Banyan与MQTT相似,但是在LAN中使用时,速度要快得多.如果需要,它确实具有连接到MQTT网络的功能.

The python-banyan framework was developed to provide an alternative to threading, multi-processing, and asyncio. Simply put, a Banyan application consists of small targeted executables that communicate with one another using protocol messages that are shared over a LAN. At its core it uses Zeromq. It was not designed to have traffic move over the WAN, but to use a LAN as a "software backplane." In some ways, Banyan is similar to MQTT, but it is much faster when used within a LAN. It does have the capability to connect to an MQTT network if that is desireable.

Banyan的一部分是一个称为OneGPIO的概念.它是一个协议消息传递规范,它抽象化了GPIO功能以独立于任何硬件实现.为了实现特定的硬件,开发了专用的Banyan组件,称为Banyan硬件网关.有适用于Raspberry Pi,Arduino,ESP-8266和Adafruit Crickit Hat的网关.GPIO应用程序发布通用的OneGPIO消息,任何或所有网关都可以选择接收该消息.要从一个硬件平台转移到另一个硬件平台,将启动与硬件相关的网关,并且无需进行修改即可启动控制组件(下面显示的代码).从一个硬件平台到另一个硬件平台,任何组件都不需要进行代码修改,而控制组件和网关都无需修改.启动控制组件时,可以通过命令行选项来指定诸如引脚号之类的变量.对于Arduino网关,使用pymata-express来控制Arduino的GPIO.Pymata-express是StandardFirmata客户端的异步实现.需要注意的是,下面的代码不是异步的.Banyan框架允许人们使用适合问题的工具进行开发,但允许解耦解决方案的各个部分,在这种情况下,该应用程序允许将asyncio与非asyncio混合使用,而不会在执行过程中通常遇到任何麻烦如此.

Part of Banyan is a concept called OneGPIO. It is a protocol messaging specification that abstracts GPIO functionality to be independent of any hardware implementation. To implement the hardware specifics, specialized Banyan components, called Banyan Hardware Gateways were developed. There are gateways available for the Raspberry Pi, Arduino, ESP-8266 and Adafruit Crickit Hat. A GPIO application publishes the generic OneGPIO messages that any or all of the gateways can elect to receive. To move from one hardware platform to another, the hardware associated gateway is launched, and without modification, the control component (which is the code shown below) is launched. To go from one hardware platform to another, there are no code modifications necessary for any of the components, neither the control component nor the gateway is modified. Variables, such as pin numbers may be specificied through command line options when launching the control component. For the Arduino Gateway, pymata-express is used to control the GPIO of the Arduino. Pymata-express is an asyncio implementation of a StandardFirmata client. The thing to note that the code below is not asyncio. The Banyan framework allows one to develop using the tools that fit the problem, yet allow decoupling of parts of the solution, and in this case, the application allows the mixing the of asyncio with non-asyncio without any of the headaches normally encountered in doing so.

在提供的代码中,类定义下的所有代码均用于提供对命令行配置选项的支持.

In the code provided, all the code below the class definition is used to provide support for command-line configuration options.

import argparse
import signal
import sys
import threading
import time

from python_banyan.banyan_base import BanyanBase


class HCSR04(BanyanBase, threading.Thread):
    def __init__(self, **kwargs):
        """
        kwargs contains the following parameters
        :param back_plane_ip_address: If none, the local IP address is used
        :param process_name: HCSR04
        :param publisher_port: publishing port
        :param subscriber_port: subscriber port
        :param loop_time: receive loop idle time
        :param trigger_pin: GPIO trigger pin number
        :param echo_pin: GPIO echo pin number
        """

        self.back_plane_ip_address = kwargs['back_plane_ip_address'],
        self.process_name = kwargs['process_name']
        self.publisher_port = kwargs['publisher_port']
        self.subscriber_port = kwargs['subscriber_port'],
        self.loop_time = kwargs['loop_time']
        self.trigger_pin = kwargs['trigger_pin']
        self.echo_pin = kwargs['echo_pin']
        self.poll_interval = kwargs['poll_interval']

        self.last_distance_value = 0

        # initialize the base class
        super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                     subscriber_port=kwargs['subscriber_port'],
                                     publisher_port=kwargs['publisher_port'],
                                     process_name=kwargs['process_name'],
                                     loop_time=kwargs['loop_time'])

        threading.Thread.__init__(self)
        self.daemon = True

        self.lock = threading.Lock()

        # subscribe to receive messages from arduino gateway
        self.set_subscriber_topic('from_arduino_gateway')

        # enable hc-sr04 in arduino gateway
        payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                   'echo_pin': self.echo_pin}
        self.publish_payload(payload, 'to_arduino_gateway')

        # start the thread
        self.start()

        try:
            self.receive_loop()
        except KeyboardInterrupt:
            self.clean_up()
            sys.exit(0)

    def incoming_message_processing(self, topic, payload):
        print(topic, payload)
        with self.lock:
            self.last_distance_value = payload['value']

    def run(self):
        while True:
            with self.lock:
                distance = self.last_distance_value
            payload = {'distance': distance}
            topic = 'distance_poll'
            self.publish_payload(payload, topic)
            time.sleep(self.poll_interval)


def hcsr04():
    parser = argparse.ArgumentParser()
    # allow user to bypass the IP address auto-discovery.
    # This is necessary if the component resides on a computer
    # other than the computing running the backplane.
    parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                        help="None or IP address used by Back Plane")
    parser.add_argument("-i", dest="poll_interval", default=1.0,
                        help="Distance polling interval")
    parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                        help="Set process name in banner")
    parser.add_argument("-p", dest="publisher_port", default="43124",
                        help="Publisher IP port")
    parser.add_argument("-s", dest="subscriber_port", default="43125",
                        help="Subscriber IP port")
    parser.add_argument("-t", dest="loop_time", default=".1",
                        help="Event Loop Timer in seconds")
    parser.add_argument("-x", dest="trigger_pin", default="12",
                        help="Trigger GPIO pin number")
    parser.add_argument("-y", dest="echo_pin", default="13",
                        help="Echo GPIO pin number")

    args = parser.parse_args()

    if args.back_plane_ip_address == 'None':
        args.back_plane_ip_address = None
    kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                  'publisher_port': args.publisher_port,
                  'subscriber_port': args.subscriber_port,
                  'process_name': args.process_name,
                  'loop_time': float(args.loop_time),
                  'trigger_pin': int(args.trigger_pin),
                  'echo_pin': int(args.echo_pin),
                  'poll_interval': int(args.poll_interval)
                  }

    # replace with the name of your class
    HCSR04(**kw_options)


# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
    print('Exiting Through Signal Handler')
    raise KeyboardInterrupt


# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == '__main__':
    hcsr04()

这篇关于带有异步的pyzmq REQ/REP等待变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆