MQTT从主线程发送消息 [英] MQTT send message from main thread
问题描述
我在MqttHelper
类中实现了简单的MQTT订阅服务器,该订阅服务器可以正常工作并接收订阅.但是,当我需要从主程序向服务器发送消息时应该如何处理.我有方法publish
在IMqttActionListener
上工作正常,但是如何在按钮按下事件时从主程序发送文本?
package com.kkk.mqtt.helpers;
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.UnsupportedEncodingException;
public class MqttHelper {
public MqttAndroidClient mqttAndroidClient;
final String serverUri = "tcp://tailor.cloudmqtt.com:16424";
final String clientId = "ExampleAndroidClient";
public final String subscriptionTopic = "sensor";
final String username = "xxx";
final String password = "yyy";
public MqttHelper(Context context){
mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
Log.w("mqtt", s);
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
connect();
}
public void setCallback(MqttCallbackExtended callback) {
mqttAndroidClient.setCallback(callback);
}
public void publish(String topic, String info)
{
byte[] encodedInfo = new byte[0];
try {
encodedInfo = info.getBytes("UTF-8");
MqttMessage message = new MqttMessage(encodedInfo);
mqttAndroidClient.publish(topic, message);
Log.e ("Mqtt", "publish done");
} catch (UnsupportedEncodingException | MqttException e) {
e.printStackTrace();
Log.e ("Mqtt", e.getMessage());
}catch (Exception e) {
Log.e ("Mqtt", "general exception "+e.getMessage());
}
}
private void connect(){
Log.w("Mqtt", "connect start " );
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
{
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt", "onSuccess " );
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
publish(MqttHelper.this.subscriptionTopic,"information");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Failed to connect to: " + serverUri + exception.toString());
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
private void subscribeToTopic() {
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt","Subscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Subscribed fail!");
}
});
} catch (MqttException ex) {
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
}
启动MQTT订户的代码:
private void startMqtt() {
mqttHelper = new MqttHelper(getApplicationContext());
mqttHelper.setCallback(new MqttCallbackExtended()
{
@Override
public void connectComplete(boolean b, String s) {
Log.w("Mqtt", "Connect complete"+ s );
}
@Override
public void connectionLost(Throwable throwable) {
Log.w("Mqtt", "Connection lost" );
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
dataReceived.setText(mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.w("Mqtt", "Delivery complete" );
}
});
Log.w("Mqtt", "will publish");
}
Paho不在UI线程上运行,但是它可以异步地回调到UI线程.
只需让Activity
或Fragment
实现MqttCallbackExtended
接口:
public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended {
...
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.d("Mqtt", "Connect complete > " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
Log.d("Mqtt", "Connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d("Mqtt", "Received > " + topic + " > " + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d("Mqtt", "Delivery complete");
}
}
并使用SomeActivity
作为MqttCallbackExtended listener
构造MqttHelper
:
public MqttHelper(Context context, MqttCallbackExtended listener) {
this.mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
this.mqttAndroidClient.setCallback(listener);
}
例如:
this.mqttHelper = new MqttHelper(this);
this.mqttHelper.setCallback(this);
this.mqttHelper.publish("Java", "SomeActivity will handle the callbacks.");
在Application
中处理这些是有问题的,因为Application
没有UI且Context
没有Theme
.但是对于扩展Activity
,Fragment
,DialogFragment
,RecyclerView.Adapter
等的类,当想要与其 UI进行交互时,实现回调interface
是有意义的.>
作为参考, extends
MqttCallback
.
I have simple MQTT subscriber implemented in MqttHelper
class that works fine and receives subscriptions. But how I should deal when I need to send message to server from main program. I have method publish
that works fine from IMqttActionListener
but how to send text from main program on button pressed event?
package com.kkk.mqtt.helpers;
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.UnsupportedEncodingException;
public class MqttHelper {
public MqttAndroidClient mqttAndroidClient;
final String serverUri = "tcp://tailor.cloudmqtt.com:16424";
final String clientId = "ExampleAndroidClient";
public final String subscriptionTopic = "sensor";
final String username = "xxx";
final String password = "yyy";
public MqttHelper(Context context){
mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
Log.w("mqtt", s);
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
connect();
}
public void setCallback(MqttCallbackExtended callback) {
mqttAndroidClient.setCallback(callback);
}
public void publish(String topic, String info)
{
byte[] encodedInfo = new byte[0];
try {
encodedInfo = info.getBytes("UTF-8");
MqttMessage message = new MqttMessage(encodedInfo);
mqttAndroidClient.publish(topic, message);
Log.e ("Mqtt", "publish done");
} catch (UnsupportedEncodingException | MqttException e) {
e.printStackTrace();
Log.e ("Mqtt", e.getMessage());
}catch (Exception e) {
Log.e ("Mqtt", "general exception "+e.getMessage());
}
}
private void connect(){
Log.w("Mqtt", "connect start " );
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
{
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt", "onSuccess " );
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
publish(MqttHelper.this.subscriptionTopic,"information");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Failed to connect to: " + serverUri + exception.toString());
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
private void subscribeToTopic() {
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt","Subscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Subscribed fail!");
}
});
} catch (MqttException ex) {
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
}
Code that starts MQTT subscriber:
private void startMqtt() {
mqttHelper = new MqttHelper(getApplicationContext());
mqttHelper.setCallback(new MqttCallbackExtended()
{
@Override
public void connectComplete(boolean b, String s) {
Log.w("Mqtt", "Connect complete"+ s );
}
@Override
public void connectionLost(Throwable throwable) {
Log.w("Mqtt", "Connection lost" );
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
dataReceived.setText(mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.w("Mqtt", "Delivery complete" );
}
});
Log.w("Mqtt", "will publish");
}
Paho does not run on the UI thread, but it may asynchronously call back to the UI thread.
Just let an Activity
or Fragment
implement the MqttCallbackExtended
interface:
public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended {
...
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.d("Mqtt", "Connect complete > " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
Log.d("Mqtt", "Connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d("Mqtt", "Received > " + topic + " > " + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d("Mqtt", "Delivery complete");
}
}
And construct the MqttHelper
with SomeActivity
as it's MqttCallbackExtended listener
:
public MqttHelper(Context context, MqttCallbackExtended listener) {
this.mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
this.mqttAndroidClient.setCallback(listener);
}
For example:
this.mqttHelper = new MqttHelper(this);
this.mqttHelper.setCallback(this);
this.mqttHelper.publish("Java", "SomeActivity will handle the callbacks.");
Handling these in Application
is problematic, because Application
has no UI and it's Context
has no Theme
. But for classes extending Activity
, Fragment
, DialogFragment
, RecyclerView.Adapter
, etc. it makes senses to implement the callback interface
, when wanting to interact with their UI.
For reference, MqttCallbackExtended
extends
MqttCallback
.
这篇关于MQTT从主线程发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!