如何将 microbit 与 BLE 连接并监听按钮按下事件? [英] How do I connect microbit with BLE and listen for button press events?

查看:40
本文介绍了如何将 microbit 与 BLE 连接并监听按钮按下事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

11/28/2021

如果您需要使用低功耗蓝牙将 microbit 连接到计算机,并在单击按钮时执行操作.直接跳转并按照下面

如果左侧菜单中没有蓝牙,请点击屏幕右上角的齿轮,选择Extensions,然后选择Bluetooth替换 Radio 扩展.

可以通过使用 pydbus 库来简化 RPi 上的 GATT 客户端代码- 总线绑定.

使用蓝牙,与其使用 while 循环并不断轮询 micro:bit,不如使用事件循环订阅来自(在本例中)Button A 特性的通知..每次按下或释放 micro:bit 按钮 A 时,都会调用我命名为 btn_handler 的函数.

下面的代码不会在 micro:bit 和 RPi 之间进行初始配对.由于配对是一次性配置步骤,我手动完成.

这是 RPi 的示例 Python 代码,它响应 micro:bit 上的按钮 A 被按下...

 from time import sleep导入pydbus从 gi.repository 导入 GLibDEVICE_ADDR = 'DE:82:35:E7:CE:BE' # micro:bit 地址BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'# DBus 对象路径BLUEZ_SERVICE = 'org.bluez'ADAPTER_PATH = '/org/bluez/hci0'device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}";#设置dbus总线 = pydbus.SystemBus()mngr = bus.get(BLUEZ_SERVICE, '/')适配器 = bus.get(BLUEZ_SERVICE, ADAPTER_PATH)设备 = bus.get(BLUEZ_SERVICE, device_path)设备连接()而不是 device.ServicesResolved:睡眠(0.5)def get_characteristic_path(dev_path, uuid):""""查找特征 UUID 的 DBus 路径"""mng_objs = mngr.GetManagedObjects()对于 mng_objs 中的路径:chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')如果 path.startswith(dev_path) 和 chr_uuid == uuid.casefold():返回路径# 特征 DBus 信息btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)# 在没有事件循环通知的情况下读取按钮 A打印(btn_a.ReadValue({}))# 为通知启用事件循环def btn_handler(iface, prop_changed, prop_removed):"通知按钮按下事件处理程序""如果 prop_changed 中的值":new_value = prop_changed['Value']打印(f按钮A状态:{new_value}")打印(f'作为字节:{bytes(new_value)}')打印(f'作为字节数组:{bytearray(新值)}')打印(f'作为整数:{int(new_value[0])}')打印(f'作为布尔值:{bool(new_value[0])}')mainloop = GLib.MainLoop()btn_a.onPropertiesChanged = btn_handlerbtn_a.StartNotify()尝试:mainloop.run()除了键盘中断:主循环.退出()btn_a.StopNotify()device.Disconnect()

11/28/2021 Edit :

If you need to connect your microbit to your computer using Bluetooth Low Energy, and do stuff when the button is clicked. Jump straight and follow @ukBaz's answer below.

Note: The solution will work perfectly on GNU/Linux, but maybe not so much on Windows.

Below is the original question of the post. I'm not going to edit it to hide my mistakes.


Summary: I have a microbit connected to a rpi-zero. I coded the microbit, when A button is pressed it will then send data through uart.write to the rpi-zero.

In this test, the microbit will uart.write("Test"), write a "Test" word to the rpi-zero.

My ultimate goal is to use rpi-zero's BLE capabilities to act as a control device with instructions sent from microbit buttons.

I found this GATT Server Code written in python for rpi. Which it ran with no problem at all.

The code below will be used for listening to microbit uart service and check whether if data received is "Test":

import serial

serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.5, stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

while True:

        serialString = serialPort.readline()

        if serialString == b'Test':
            print("Yes")
        else:
            print("F")

But the real problem is when I try to implement this loop code into the GATT server code.

I cannot seem to get my head around on how to pass this value to self.send_tx

Moreover, it seems that there is a global loop already in the GATT server code. So I tried to use threading to run both of the functions simultaneously but when I add self.send_tx("Test") it will just throw an error Self is not defined.

I'm sorry I'm a total noob to coding, does anyone know the possible fix to this? Thank you

Here's the full code:

import sys
import threading
import dbus, dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb

BLUEZ_SERVICE_NAME =           'org.bluez'
DBUS_OM_IFACE =                'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE =           'org.bluez.GattManager1'
GATT_CHRC_IFACE =              'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID =            '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID =  '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID =  '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME =                   'rpi-gatt-server'
mainloop = None

serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.8, stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

class TxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
                                ['notify'], service)
        self.notifying = False
        GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.on_console_input)

    def on_console_input(self, fd, condition):
        s = fd.readline()
        if s.isspace():
            pass
        else:
            self.send_tx(s)
        return True

    def send_tx(self, s):
        if not self.notifying:
            return
        value = []
        for c in s:
            value.append(dbus.Byte(c.encode()))
        self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])

    def StartNotify(self):
        if self.notifying:
            print("yes")
            return
        self.notifying = True

    def StopNotify(self):
        if not self.notifying:
            print("no")
            return
        self.notifying = False

class RxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
                                ['write'], service)

    def WriteValue(self, value, options):
        print('remote: {}'.format(bytearray(value).decode()))

class UartService(Service):
    def __init__(self, bus, index):
        Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
        self.add_characteristic(TxCharacteristic(bus, 0, self))
        self.add_characteristic(RxCharacteristic(bus, 1, self))

class Application(dbus.service.Object):
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
        return response

class UartApplication(Application):
    def __init__(self, bus):
        Application.__init__(self, bus)
        self.add_service(UartService(bus, 0))

class UartAdvertisement(Advertisement):
    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_service_uuid(UART_SERVICE_UUID)
        self.add_local_name(LOCAL_NAME)
        self.include_tx_power = True

def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()
    for o, props in objects.items():
        if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
            return o
        print('Skip adapter:', o)
    return None

def check():
    while True:
        serialString = serialPort.readline()
        if serialString == b'Test':
            print("Okay, Test")
            self.send_tx("Test")
        else:
            print("No")

def main():
    global mainloop
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    adapter = find_adapter(bus)
    if not adapter:
        print('BLE adapter not found')
        return

    service_manager = dbus.Interface(
                                bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                GATT_MANAGER_IFACE)
    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_IFACE)

    app = UartApplication(bus)
    adv = UartAdvertisement(bus, 0)

    mainloop = GLib.MainLoop()

    service_manager.RegisterApplication(app.get_path(), {},
                                        reply_handler=register_app_cb,
                                        error_handler=register_app_error_cb)
    ad_manager.RegisterAdvertisement(adv.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        adv.Release()

if __name__ == '__main__':
    p1 = threading.Thread(target=main)
    p2 = threading.Thread(target=check)
    p1.start()
    p2.start()

解决方案

I think this might be an XY problem. What I have understood is that you want to send a button press from the micro:bit to the RPi via Bluetooth Low Energy (BLE). If that is the case, then there is a more efficient way to do this.

In the Bluetooth Profile for the micro:bit there is a button service and a button A status characteristic that can be used. Their UUIDs are:

BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'

You need to set up the GATT server on the micro:bit. The most efficient way to do this is using https://makecode.microbit.org/#editor to create the following:

If you don't have Bluetooth on the left-hand menu then click on the cog in the top right of the screen, select Extensions, and then select Bluetooth to replace the Radio extension.

The GATT client code on the RPi can be simplified by using the pydbus library for the D-Bus bindings.

With Bluetooth, rather than have a while loop and keep polling the micro:bit, it is more efficient to have an event loop that subscribes to notifications from (in this case) the Button A characteristic. The function I have named btn_handler is called each time the micro:bit button A is pressed or released.

The code below doesn't do the initial pairing between the micro:bit and the RPi. As pairing is a one-off provisioning step I do that manually.

Here is example python code for the RPi that responses to Button A on the micro:bit being pressed...

from time import sleep
import pydbus
from gi.repository import GLib

DEVICE_ADDR = 'DE:82:35:E7:CE:BE' #  micro:bit address
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'

# DBus object paths
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}"

# setup dbus
bus = pydbus.SystemBus()
mngr = bus.get(BLUEZ_SERVICE, '/')
adapter = bus.get(BLUEZ_SERVICE, ADAPTER_PATH) 
device = bus.get(BLUEZ_SERVICE, device_path)

device.Connect()

while not device.ServicesResolved:
    sleep(0.5)

def get_characteristic_path(dev_path, uuid):
    """Look up DBus path for characteristic UUID"""
    mng_objs = mngr.GetManagedObjects()
    for path in mng_objs:
        chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')
        if path.startswith(dev_path) and chr_uuid == uuid.casefold():
           return path

# Characteristic DBus information
btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)
btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)
# Read button A without event loop notifications
print(btn_a.ReadValue({}))

# Enable eventloop for notifications
def btn_handler(iface, prop_changed, prop_removed):
    """Notify event handler for button press"""
    if 'Value' in prop_changed:
        new_value = prop_changed['Value']
        print(f"Button A state: {new_value}")
        print(f'As byte: {bytes(new_value)}')
        print(f'As bytearray: {bytearray(new_value)}')
        print(f'As int: {int(new_value[0])}')
        print(f'As bool: {bool(new_value[0])}')

mainloop = GLib.MainLoop()
btn_a.onPropertiesChanged = btn_handler
btn_a.StartNotify()
try:
    mainloop.run()
except KeyboardInterrupt:
    mainloop.quit()
    btn_a.StopNotify()
    device.Disconnect()

这篇关于如何将 microbit 与 BLE 连接并监听按钮按下事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆