如何通过发送新的写入命令来响应BLE特性通知 [英] How to respond to BLE characteristic notifications by sending a new write command

查看:155
本文介绍了如何通过发送新的写入命令来响应BLE特性通知的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在更新一个应用程序以使用RxAndroidBLE,并在如何将现有的回调模式转换为Rx模式方面苦苦挣扎.特别是,我需要根据接收到的数据以不同的方式响应特征通知,然后将特定的写命令发送回设备(这将导致循环更新特征).

I'm updating an app to use RxAndroidBLE, and struggling with how to translate my existing callback pattern into an Rx pattern. In particular, I need to respond to characteristic notifications in different ways depending on the received data, and send a specific write command back to the device (which will then cause the characteristic to be updated, in a loop).

其背后的理由是,我正在集成的BLE设备具有特殊的自定义特性,我们可以向其发送不同的命令,然后侦听数据.

The rationale behind this is that the BLE device I'm integrating with has a special custom characteristic, to which we can send different commands and then listen for data back.

我已经阅读了很多有关使用RxBLE进行链接的命令的信息,但似乎都没有解决我的特定查询的问题,这是如何在观察到更改通知时将命令发送回设备的(因为连接本身似乎已断开连接)到达可观察区域时的范围).这样做的接收方式"是什么?

I've read up lots about chaining commands using RxBLE, but none seem to address my particular query, which is how to send a command back to the device on observing a change notification (since the connection itself seems to be out of scope by the time we get to the observable block). What is the "Rx Way" of doing this?

为清楚起见,这是我的BLE服务的整个流程:

For clarity, this is the entire flow of my BLE service:

  1. 扫描具有自定义特征的过滤器的设备
  2. 连接到找到的设备
  3. 读取几个标准特征(字符串),并将其存储在我们的数据模型中
  4. 并且仅当其中一个特征与字符串数组之一匹配时,继续执行5.否则,请处置连接.
  5. 订阅我们的自定义控件"特征("CC")以获取更改通知
  6. 将命令1发送到CC.这应触发在CC中设置答案1,因此我们的处理程序称为
  7. 对答案1进行一些计算,然后保存到我们的模型中.将命令2(包含这些修改后的值,因此我们无法在编译时确定它)发送给CC.这应该会触发CC中的答案2.
  8. 在收到答案2后,发送命令3,该命令将触发答案3.
  9. 在收到答案3时,解析为一个int值.如果答案3 == 0,请断开连接-完成.
  10. 答案3> 0,因此发送命令4.这将触发答案4.
  11. 对答案4进行一些计算并将结果存储在我们的模型中
  12. 然后发送命令5,它将实际触发答案3(命令5和3都触发答案3).由于我们已经订阅了答案3,因此将我们带回到上面的步骤9.我们一直循环直到答案3为0(即,我们已经保存了所有数据).
  1. scan for devices with a filter on our custom characteristic
  2. connect to a found device
  3. read a couple of standard characteristics (strings), and store these in our data model
  4. if and only if one of the characteristics matches one of an array of strings, proceed to 5. otherwise, dispose of the connection.
  5. subscribe to our custom "control" characteristic ("CC") for change notifications
  6. send command 1 to CC. this should trigger answer 1 to be set in CC, so our handler is called
  7. perform some calculations on answer 1 and save to our model. send command 2 (which includes these modified values, so we can't determine this at compile time) to CC. this should trigger answer 2 in CC.
  8. on receiving answer 2, send command 3, which should trigger answer 3.
  9. on reciving answer 3, parse into an int value. if answer 3 == 0, dispose of the connection - we are done.
  10. answer 3 > 0, so send command 4. this will trigger answer 4.
  11. perform some calculations on answer 4 and store the results in our model
  12. then send command 5, which will actually trigger answer 3 (both commands 5 and 3 trigger answer 3). since we are already subscribed to answer 3, this takes us back to step 9. above - we keep looping until answer 3 is 0 (ie. we have saved all the data).

我不愿意共享代码,因为我很清楚以下内容无法实现-但我希望它描述了我正在尝试做的事情,即使语法甚至不编译:

I was loathe to share code, as I'm well aware there is no possible way the following will work - but I'm hoping it describes what I'm trying to do even if the syntax won't even compile:

                  connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                        //store the battery info in our model here
                                                    })
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                                //store the serial number info in our model here
                                                                //TODO: how do we only proceed to the subscription if serialNumber is correct?
                                                            }
                                                    )
                                                    .flatMap(rxBleConnection -> rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID))
                                                    .doOnNext(notificationObservable -> {
                                                        // Notification has been set up
                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); //we can't do this because rxBleConnection is out of scope!
                                                    })
                                                    .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
                                                    .subscribe(
                                                            bytes -> {
                                                                // Given characteristic has been changes, here is the value.

                                                                switch(commandFromBytes(bytes)){
                                                                    case answer1:
                                                                        int newCommand = doSomethingWith(bytes);
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                        break;
                                                                    case answer2:
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                        break;
                                                                    case answer3:
                                                                        if(bytes <= 0){
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                        else{
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                        }
                                                                        break;
                                                                    case answer4:

                                                                            doSomethingLongWindedWith(bytes);
                                                                            //then
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                            //command 5 will cause answer3 to be notified, so we loop back above                                                                             
                                                                        break;
                                                                }

                                                            },
                                                            throwable -> {
                                                                // Handle an error here.
                                                            }
                                                    );

在玩了探戈舞之后,我想我已经接近解决方案了:

Edit 2: after playing bracket tango for a bit, I think I'm close to a solution here:

 connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID)
                                                            .doOnNext(bytes -> {
                                                                connectedDevice.setBatLevel(bytes);
                                                            })
                                                            .flatMapSingle(rxBleConnection2 -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                            .doOnNext(bytes -> {
                                                                        connectedDevice.setSerialNum(bytes);
                                                                        //we also notify a singleton listener here
                                                                    }
                                                            )
                                                            .flatMap(rxBleConnection3 -> {
                                                                        if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                            rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID).subscribe(
                                                                                    bytes -> {
                                                                                        // Given characteristic has been changes, here is the value.

                                                                                        switch (commandFromBytes(bytes)) {
                                                                                            case answer1:
                                                                                                int newCommand = doSomethingWith(bytes);
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                                                break;
                                                                                            case answer2:
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                                                break;
                                                                                            case answer3:
                                                                                                if (bytes <= 0) {
                                                                                                    //we also notify a singleton listener here
                                                                                                    connectedDevice.connectionDisposable.dispose();
                                                                                                } else {
                                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                                                }
                                                                                                break;
                                                                                            case answer4:

                                                                                                doSomethingLongWindedWith(bytes);
                                                                                                //then
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                                                //command 5 will cause answer3 to be notified, so we loop back above
                                                                                                break;
                                                                                        }
                                                                                    },
                                                                                    throwable -> {
                                                                                        // Handle an error here.
                                                                                    }
                                                                            );
                                                                        } else {
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                    }
                                                                            .doOnNext(notificationObservable -> {
                                                                                // Notification has been set up
                                                                                if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1);
                                                                                }
                                                                            })
                                                            ));

推荐答案

根据杰克·沃顿(Jake Wharton)的演讲将是构造一个Observable,它仅发出更新模型所需的值.

The best approach, according to this Jake Wharton's talk would be to construct an Observable that would emit just values that are needed for updating your model.

(以Kotlin为例)

(example in Kotlin)

我们可以得到Observable的这些输出:

We could have these outputs of the Observable:

sealed class ConnectionEvent {
    object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closed
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

整个流程看起来像这样:

And the whole flow could look like this:

bleDevice.establishConnection(false)
        .flatMap { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        when {
                            it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not match
                            else -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        }
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
        .subscribe(
                { connectionEvent ->
                    when(connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

其中写/通知舞是:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.flatMap {
                    when (answerFromBytes(it)) {
                        answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
                        answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                        answer3 -> when (it.isEmpty()) {
                            true -> Observable.just(ConnectionEvent.CloseConnection)
                            else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                        }
                        answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
                                .startWith(Observable.just(ConnectionEvent.Answer4(it)))
                        else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
                    }
                }
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
            }
}

为了更加清晰,我使用了扩展功能:

I have used an extension function for more clarity:

fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()

我稍微更改了代码以摆脱CloseConnection事件,并利用可观察对象的完成.所以现在的输出看起来像这样:

I have changed the code a bit to get rid of CloseConnection event and leverage the completions of the observables. So now the outputs look like this:

sealed class ConnectionEvent {
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

主要流程:

bleDevice.establishConnection(false)
        .map { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        else Observable.empty() // do not continue if serial number does not match
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .publish {
            // create a Completable from the above Observable.concat()
            val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
                    .flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
            it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
        }
        .flatMap { it } // unwrap the above flow
        .subscribe(
                { connectionEvent ->
                    when (connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

写入/通知部分:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
            }
            .takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes
            .flatMap<ConnectionEvent> { (answer, bytes) ->
                when (answer) {
                    answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
                    answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                    answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                    answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model
                            .concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
                    else -> Observable.error(Exception("Unexpected answer! => $answer"))
                }
            }
}

扩展名:

fun <T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()

这篇关于如何通过发送新的写入命令来响应BLE特性通知的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆