使用 asyncio 读取串口的输出 [英] Using asyncio to read the output of a serial port

查看:101
本文介绍了使用 asyncio 读取串口的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在做一些端口读取工作,我需要知道我是否应该使用 asyncIO 来读取从这个端口进来的数据.

I have been doing some port-reading for work, and I need to know if I should use asyncIO to read the data that comes in from this port.

这里有一些关于系统构建方式的细节.

Here's some details about the way in which the system is constructed.

  1. 同时读取多个传感器,并且可能产生同时进入的输出.(所有数据都通过连接到 USB 端口的探针 ZU10 传入)所有数据必须在传入后立即加上时间戳.

  1. Multiple sensors are being read at the same time and might produce output that goes in at the same time. (all data comes in through a probee ZU10 connected to a usb port) All data must be timestamped as soon as it comes in.

数据应该被处理,然后通过 REST API 发送到 django webapp.

The data is supposed to be processed and then sent to a django webapp through a REST API.

问题是,不丢失任何数据真的很重要,我问这个是因为我相信必须有一种比我想到的更简单的方法来做到这一点.

The thing Is, it's really important not to lose any data, I'm asking about this because I believe there must be an easier way to do this than the one I'm thinking of.

我希望数据进入的方式是通过一个异步过程将数据放入队列并为其添加时间戳,这样就不会存在数据丢失的情况,并且时间戳可能不超过几分之一秒,这不是问题.

The way I want the data to come in is through an asyncronous process that takes in the data into a queue and timestamps it, this way there is no way in which loss of data is present, and timestamping may be no more than a few fractions of a second off, which is not a problem.

如果有人有任何想法,我会很感激他们

If anyone has got any ideas I would be thankful for them

这是我用来打开端口、接收数据的代码以及到目前为止我在实际读取有意义的数据方面所获得的信息.

Here's the code I'm using to open the port, take the data in and what I've got so far on the actually reading the meaningful data.

阅读部分:

import serial #for port opening
import sys #for exceptions

#

#configure the serial connections (the parameters differs on the device you are connecting to)

class Serializer: 
    def __init__(self, port, baudrate=9600, timeout=.1): 
        self.port = serial.Serial(port = port, baudrate=baudrate, 
        timeout=timeout, writeTimeout=timeout)

    def open(self): 
        ''' Abre Puerto Serial'''
        self.port.open()

    def close(self): 
        ''' Cierra Puerto Serial'''
        self.port.close() 

    def send(self, msg):
        ''' envía mensaje a dispositivo serial'''
        self.port.write(msg)

    def recv(self):
        ''' lee salidas del dispositivo serial '''
        return self.port.readline()

PORT = '/dev/ttyUSB0' #Esto puede necesitar cambiarse


# test main class made for testing
def main():
    test_port = Serializer(port = PORT)

    while True:
        print(test_port.recv())

if __name__ == "__main__":
    main()

还有一些我将用来过滤掉有意义的读取的东西(忍受它,它可能充满了可怕的错误,也许是一个可怕的正则表达式):

And a bit of what I'm going to be using to filter out the meaningful reads (bear with it, it might be full of awful errors and maybe a terrible RegEx):

import re
from Lector import ChecaPuertos
from Lector import entrada


patterns = [r'^{5}[0-9],2[0-9a-fA-F] $'] #pattern list

class IterPat:
    def __init__(self, lect, pat = patterns):
        self.pat = pat  # lista de patrones posibles para sensores
        self.lect = lect  # lectura siendo analizada
        self.patLen = len(pat)  #Largo de patrones

    def __iter__(self):
        self.iteracion = 0 #crea la variable a iterar.

    def __next__(self):
        '''
        Primero revisa si ya pasamos por todas las iteraciones posibles
        luego revisa si la iteración es la que pensabamos, de ser así regresa una
        tupla con el patrón correspondiente, y la lectura
        de otra forma para el valor de ser mostrado
        '''
        pattern = re.compile(self.pat[self.iteracion])
        comp = pattern.match(self.lect)
        if comp == True:
            re_value = (self.pattern, self.lect)
            return re_value
        else:
            self.iteración += 1

def main():
    puerto = ChecaPuertos.serial_ports()
    serial = entrada.Serializer(port = puerto[0])

    if serial != open:
        serial.open()
    while True:
        iter = IterPat()

     #This is incomplete right here.

推荐答案

我正在使用 asyncio 通过 pyserial 读/写串行端口.我让串行连接另一端的设备在准备接收负载时写入一个字节.Asyncio 监视该字节然后发送有效负载.它看起来像这样:

I am using asyncio to read/write a serial port with pyserial. I am having my device on the other end of the serial connection write a single byte when it is ready to receive a payload. Asyncio watches for that byte then sends the payload. It looks something like this:

serial_f = serial.Serial(port=dev, baudrate=BAUDRATE, timeout=2)

def write_serial():
    status = serial_f.read(1)
    serial_f.write(buffer)

loop = asyncio.get_event_loop()
loop.add_reader(serial_f.fileno(), write_serial)

这篇关于使用 asyncio 读取串口的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆