MQTT PAHO客户端未自动重新连接到Android服务上的Broker [英] MQTT Paho Client not reconnect automatically to broker on Android Service

查看:95
本文介绍了MQTT PAHO客户端未自动重新连接到Android服务上的Broker的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务来管理我的MQTT客户端连接,MQTT工作得很好,但问题是当我重新启动Broker Server时,Android客户端无法重新连接。onConnectionLost()回调触发异常。

备注

  1. 我在同一台计算机上使用Moquette Broker->Moquette
  2. 我有两个Android客户端应用程序,一个使用服务(有问题的)和其他在没有服务的线程上工作(这很好用,重新连接也可以)。
  3. 我无法运行Android客户端MQTT lib,因为我使用的是Eclipse PAHO MQTT。
  4. 是,我制作setAutomaticReconnect(true);

问题

使用Service的Android应用程序永远有效,而不是重新连接到MQTT Broker。

代码

MQTTService.java

public class MQTTService extends Service implements MqttCallbackExtended {

    boolean running;
private static final String TAG = "MQTTService";

public static final String ACTION_MQTT_CONNECTED = "ACTION_MQTT_CONNECTED";
public static final String ACTION_MQTT_DISCONNECTED = "ACTION_MQTT_DISCONNECTED";
public static final String ACTION_DATA_ARRIVED = "ACTION_DATA_ARRIVED";

// MQTT
MqttClient mqttClient;
final String serverURI = "tcp://"+ServidorServices.IP+":1883";
final String clientId = "Responsavel";
String topicoId;
Thread mqttStartThread;

public boolean subscribe(String topic) {
    try {
        Log.i(TAG,"Subscripe: " + topic);
        mqttClient.subscribe(topic);
        mqttClient.subscribe("LOCATION_REAL");
        return true;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

// Life Cycle
@Override
public IBinder onBind(Intent intent) {
    Log.d(TAG,"onBind()");
    return null;
}

@Override
public void onCreate() {
    Log.d(TAG,"onCreate()");
    running = true;
    topicoId = getSharedPreferences("myprefs",MODE_PRIVATE).getString("tag_id_aluno","0");

    mqttStartThread = new MQTTStartThread(this);

    if(topicoId.equals("0")) {
        Log.i(TAG,"Error to subscribe");
        return;
    }

    mqttStartThread.start();
}

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    Log.d(TAG,"onStartCommand()");
    return super.onStartCommand(intent, flags, startId);
}

class MQTTStartThread extends Thread {

    MqttCallbackExtended mqttCallbackExtended;

    public MQTTStartThread(MqttCallbackExtended callbackExtended) {
        this.mqttCallbackExtended = callbackExtended;
    }

    @Override
    public void run() {
        try {
            mqttClient = new MqttClient(serverURI,clientId,new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            mqttClient.setCallback(mqttCallbackExtended);
            mqttClient.connect();
        } catch (Exception e) {
            Log.i(TAG,"Exception MQTT CONNECT: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

@Override
public void onDestroy() {
    Log.d(TAG,"onDestroy()");
    running = false;
    if (mqttClient != null) {
        try {
            if (mqttClient.isConnected()) mqttClient.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Override
public boolean onUnbind(Intent intent) {
    Log.i(TAG,"onUnbind()");
    return super.onUnbind(intent);
}

// Callbacks MQTT
@Override
public void connectComplete(boolean reconnect, String serverURI) {
    Log.i(TAG,"connectComplete()");
    if (topicoId == null) {
        Log.i(TAG,"Erro ao ler ID da Tag");
        return;
    }
    sendBroadcast(new Intent(ACTION_MQTT_CONNECTED));
    subscribe(topicoId);
}

@Override
public void connectionLost(Throwable cause) {
    Log.i(TAG,"connectionLost(): " + cause.getMessage());
    cause.printStackTrace();
    sendBroadcast(new Intent(ACTION_MQTT_DISCONNECTED));
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    Log.i(TAG,"messageArrived() topic: " + topic);

    if (topic.equals("LOCATION_REAL")) {
        Log.i(TAG,"Data: " + new String(message.getPayload()));
    } else {
        Context context = MQTTService.this;
        String data = new String(message.getPayload());
        Intent intent = new Intent(context,MapsActivity.class);
        intent.putExtra("location",data);
        LatLng latLng = new LatLng(Double.valueOf(data.split("_")[0]),Double.valueOf(data.split("_")[1]));
        String lugar = Utils.getAddressFromLatLng(latLng,getApplicationContext());
        NotificationUtil.create(context,intent,"Embarque",lugar,1);

        if (data.split("_").length < 3) {
            return;
        }

        double latitude = Double.valueOf(data.split("_")[0]);
        double longitude = Double.valueOf(data.split("_")[1]);
        String horario = data.split(" ")[2];

        Intent iMqttBroadcast = new Intent(ACTION_DATA_ARRIVED);
        iMqttBroadcast.putExtra("topico",String.valueOf(topic));
        iMqttBroadcast.putExtra("latitude",latitude);
        iMqttBroadcast.putExtra("longitude",longitude);
        iMqttBroadcast.putExtra("evento","Embarcou");
        iMqttBroadcast.putExtra("horario",horario);

        sendBroadcast(iMqttBroadcast);
    }
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    Log.i(TAG,"deliveryComplete()");
}
}

异常堆栈跟踪

I/MQTTService: connectionLost(): Connection lost
W/System.err: Connection lost (32109) - java.io.EOFException
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
W/System.err:     at java.lang.Thread.run(Thread.java:818)
W/System.err: Caused by: java.io.EOFException
W/System.err:     at java.io.DataInputStream.readByte(DataInputStream.java:77)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
W/System.err:   ... 1 more

推荐答案

我想您忘了将MqttConnectOptions包括在MqttClient对象中。

请像下面这样尝试

mqttClient.connect(options);

而不是

mqttClient.connect();

希望这可能有助于解决您的重新连接问题。

这篇关于MQTT PAHO客户端未自动重新连接到Android服务上的Broker的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆