如何将MQTT Mosquitto发布事件存储到MySQL中? [英] How to store MQTT Mosquitto publish events into MySQL?

查看:1278
本文介绍了如何将MQTT Mosquitto发布事件存储到MySQL中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经连接了一个与mosquitto MQTT服务器(RPi)进行通信的设备,并正在向指定主题发送出版物.我现在要做的是将在该主题上发布的消息在MQTT服务器上存储到MySQL数据库中.我知道MySQL的工作原理,但我不知道如何收听这些即将来临的出版物.我正在寻找在后台运行的轻量级解决方案.欢迎使用任何有关库的指针或想法.

I've connected a device that communicates to my mosquitto MQTT server (RPi) and is sending out publications to a specified topic. What I want to do now is to store the messages published on that topic on the MQTT server into a MySQL database. I know how MySQL works, but I don't know how to listen for these incoming publications. I'm looking for a light-weight solution that runs in the background. Any pointers or ideas on libraries to use are very welcome.

推荐答案

最近几天我做了类似的事情:

I've done something similar in the last days:

  • 使用pywws实时收集气象站数据
  • 使用pywws.service.mqtt发布到mqtt-Broker
  • NAS上的
  • python-script 收集数据并写入MariaDB
  • live-collecting weatherstation-data with pywws
  • publishing with pywws.service.mqtt to mqtt-Broker
  • python-script on NAS collecting the data and writing to MariaDB
#!/usr/bin/python -u

import mysql.connector as mariadb
import paho.mqtt.client as mqtt
import ssl

mariadb_connection = mariadb.connect(user='USER', password='PW', database='MYDB')
cursor = mariadb_connection.cursor()

# MQTT Settings 
MQTT_Broker = "192.XXX.XXX.XXX"
MQTT_Port = 8883
Keep_Alive_Interval = 60
MQTT_Topic = "/weather/pywws/#"

# Subscribe
def on_connect(client, userdata, flags, rc):
  mqttc.subscribe(MQTT_Topic, 0)

def on_message(mosq, obj, msg):
  # Prepare Data, separate columns and values
  msg_clear = msg.payload.translate(None, '{}""').split(", ")
  msg_dict =    {}
  for i in range(0, len(msg_clear)):
    msg_dict[msg_clear[i].split(": ")[0]] = msg_clear[i].split(": ")[1]

  # Prepare dynamic sql-statement
  placeholders = ', '.join(['%s'] * len(msg_dict))
  columns = ', '.join(msg_dict.keys())
  sql = "INSERT INTO pws ( %s ) VALUES ( %s )" % (columns, placeholders)

  # Save Data into DB Table
  try:
      cursor.execute(sql, msg_dict.values())
  except mariadb.Error as error:
      print("Error: {}".format(error))
  mariadb_connection.commit()

def on_subscribe(mosq, obj, mid, granted_qos):
  pass

mqttc = mqtt.Client()

# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe

# Connect
mqttc.tls_set(ca_certs="ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval))

# Continue the network loop & close db-connection
mqttc.loop_forever()
mariadb_connection.close()

这篇关于如何将MQTT Mosquitto发布事件存储到MySQL中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆