带有MqttAsyncClient订阅丢失消息的QoS = 1 [英] QoS=1 with MqttAsyncClient subscription miss messages

查看:970
本文介绍了带有MqttAsyncClient订阅丢失消息的QoS = 1的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有充当MQTT客户端的前台服务.我为此使用MqttAsyncClient mqttClient.
我在订阅主题上使用QoS=1

I have foreground service acting as MQTT client. I'm using MqttAsyncClient mqttClient for this purpose.
I'm using QoS=1 on subscribe to topic:

mqttClient.subscribe("sensors/s1/", 1);

但是如果我的手机离线一段时间后会错过当前时间段的消息.整个代码如下.

But in case my phone gets offline for some period of time it miss current period messages. Whole code is below.

我是我正在使用MqttAndroidClient mqttAndroidClient的另一个应用程序,在这种情况下,QoS = 1带来了所有丢失的消息.

Im my another application I'm using MqttAndroidClient mqttAndroidClient and in this case QoS=1 brings all missed messages.

mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {...})

为什么使用MqttAsyncClient且QoS = 1的订阅不能检索所有消息?

Why subscription with MqttAsyncClient with QoS=1 not retrieves all messages?

整个代码:

public class MqttGndService extends Service {

    private String ip="ssl:myserver",port="8887";
    private final IBinder mBinder = new LocalBinder();
    private Handler mHandler;


    private static final String TAG = "mqttservice";
    private static boolean hasWifi = false;
    private static boolean hasMmobile = false;
    private ConnectivityManager mConnMan;
    private volatile IMqttAsyncClient mqttClient;
    private String uniqueID;


    class MQTTBroadcastReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {

            IMqttToken token;
            boolean hasConnectivity = false;
            boolean hasChanged = false;
            NetworkInfo infos[] = mConnMan.getAllNetworkInfo();
            for (int i = 0; i < infos.length; i++) {
                if (infos[i].getTypeName().equalsIgnoreCase("MOBILE")) {
                    if ((infos[i].isConnected() != hasMmobile)) {
                        hasChanged = true;
                        hasMmobile = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v( infos[i].getTypeName() + " is " + infos[i].isConnected());
                } else if (infos[i].getTypeName().equalsIgnoreCase("WIFI")) {
                    if ((infos[i].isConnected() != hasWifi)) {
                        hasChanged = true;
                        hasWifi = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v(infos[i].getTypeName() + " is " + infos[i].isConnected());
                }
            }
            hasConnectivity = hasMmobile || hasWifi;
            Timber.tag(Utils.TIMBER_TAG).v( "hasConn: " + hasConnectivity + " hasChange: " + hasChanged + " - " + (mqttClient == null || !mqttClient.isConnected()));
            if (hasConnectivity && hasChanged && (mqttClient == null || !mqttClient.isConnected())) {
                Timber.tag(Utils.TIMBER_TAG).v("Ready to connect");
                doConnect();
                Timber.tag(Utils.TIMBER_TAG).v("do connect done");

            } else
            {
                Timber.tag(Utils.TIMBER_TAG).v("Connection not possible");
            }



        }
    }


    public class LocalBinder extends Binder {
        public MqttGndService getService() {
            // Return this instance of LocalService so clients can call public methods
            return MqttGndService.this;
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }

    public void publish(String topic, MqttMessage message) {
        SharedPreferences sharedPref = PreferenceManager.getDefaultSharedPreferences(this);// we create a 'shared" memory where we will share our preferences for the limits and the values that we get from onsensorchanged
        try {

            mqttClient.publish(topic, message);

        } catch (MqttException e) {
            e.printStackTrace();
        }

    }


    @Override
    public void onCreate() {
        Timber.tag(Utils.TIMBER_TAG).v("Creating MQTT service");
        mHandler = new Handler();//for toasts
        IntentFilter intentf = new IntentFilter();
        setClientID();
        intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        registerReceiver(new MQTTBroadcastReceiver(), new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
        mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    }

    @Override
    public void onConfigurationChanged(Configuration newConfig) {
        Timber.tag(Utils.TIMBER_TAG).v( "onConfigurationChanged()");
        android.os.Debug.waitForDebugger();
        super.onConfigurationChanged(newConfig);

    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        Timber.tag(Utils.TIMBER_TAG).v("Service onDestroy");

    }


    private void setClientID() {
        uniqueID = android.provider.Settings.Secure.getString(getContentResolver(), android.provider.Settings.Secure.ANDROID_ID);
        Timber.tag(Utils.TIMBER_TAG).v("uniqueID=" + uniqueID);

    }


    private void doConnect() {
        String broker = ip + ":" + port;
        Timber.tag(Utils.TIMBER_TAG).v("mqtt_doConnect()");
        IMqttToken token;
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setMaxInflight(100);//handle more messages!!so as not to disconnect
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(1000);
        options.setKeepAliveInterval(300);
        options.setUserName("cc50e3e91bf4");
        options.setPassword("b".toCharArray());

        try {
            options.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
        } catch (KeyStoreException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeyManagementException e) {
            e.printStackTrace();
        } catch (CertificateException e) {
            e.printStackTrace();
        } catch (UnrecoverableKeyException e) {
            e.printStackTrace();
        }

        Timber.tag(Utils.TIMBER_TAG).v("set socket factory done");
        try {


            mqttClient = new MqttAsyncClient(broker, uniqueID, new MemoryPersistence());
            token = mqttClient.connect(options);
            token.waitForCompletion(3500);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        mqttClient.disconnectForcibly();
                        mqttClient.connect();
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void messageArrived(String topic, MqttMessage msg) throws Exception {
                    Timber.tag(Utils.TIMBER_TAG).v("Message arrived from topic " + topic+ "  msg: " + msg );



                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("published");
                }
            });
            Timber.tag(Utils.TIMBER_TAG).v("will subscribe");
            mqttClient.subscribe("sensors/s1/", 1);



        } catch (MqttSecurityException e) {
            Timber.tag(Utils.TIMBER_TAG).v("general connect exception");
            e.printStackTrace();
        } catch (MqttException e) {
            switch (e.getReasonCode()) {
                case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE BROKER_UNAVAILABLE!", 1500));
                    break;
                case MqttException.REASON_CODE_CLIENT_TIMEOUT:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CLIENT_TIMEOUT!", 1500));
                    break;
                case MqttException.REASON_CODE_CONNECTION_LOST:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CONNECTION_LOST!", 1500));
                    break;
                case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
                    Timber.tag(Utils.TIMBER_TAG).v( "c " + e.getMessage());
                    e.printStackTrace();
                    break;
                case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
                    Intent i = new Intent("RAISEALLARM");
                    i.putExtra("ALLARM", e);
                    Timber.tag(Utils.TIMBER_TAG).v("b " + e.getMessage());
                    break;
                default:
                    Timber.tag(Utils.TIMBER_TAG).v( "a " + e.getMessage() +" "+ e.toString());
            }
        }
        mHandler.post(new ToastRunnable("WE ARE ONLINE!", 500));

    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand");
        String input = intent.getStringExtra(INTENT_ID);
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand "+ input);

        Intent notificationIntent = new Intent(this, MainActivity.class);
        PendingIntent pendingIntent = PendingIntent.getActivity(this,
                0, notificationIntent, 0);

        Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
                .setContentTitle("Example Service")
                .setContentText(input)
                .setSmallIcon(R.drawable.ic_android)
                .setContentIntent(pendingIntent)
                .build();

        startForeground(1, notification);


        PowerManager powerManager = (PowerManager) getSystemService(POWER_SERVICE);
        PowerManager.WakeLock wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MyApp::MyWakelockTag");
        wakeLock.acquire();


        return START_STICKY;
    }
}

推荐答案

您正在将cleansession设置为true(options.setCleanSession(true));来自 setCleanSession :

You are setting cleansession to true (options.setCleanSession(true)); from the docs for setCleanSession:

如果设置为true,则客户端和服务器将不会在客户端,服务器或连接的重新启动期间保持状态.这意味着

If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means

  • 如果重新启动客户端,服务器或连接,则无法维持到指定QOS的消息传递
  • 服务器会将订阅视为非持久

我认为

I think that the mqtt specs state this more clearly:

如果CleanSession设置为1,则客户端和服务器务必丢弃任何先前的会话并开始一个新的会话.该会话的持续时间与网络连接一样长.与该会话相关的状态数据不得在任何后续会话中重复使用

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session

因此,当您的应用程序失去连接时,会话将被丢弃,新消息将不会排队等待传递.此外,除非您重新订阅,否则当连接恢复时,您将不会收到任何其他消息.

So when your application looses the connection the session is discarded and new messages will not be queued up for delivery. In addition unless you resubscribe when the connection comes back up you will not receive any additional messages.

但是请注意,如果将cleansession设置为false,则客户端脱机时收到的任何新消息都将排队等待传递(取决于代理的配置),并且如果客户端不希望发生这种情况可能会长时间离线.

However be aware that if you set cleansession to false then any new messages received while your client is offline will be queued for delivery (subject to the configuration of the broker) and this might not be what you want to happen if the client could be offline for a long time.

这篇关于带有MqttAsyncClient订阅丢失消息的QoS = 1的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆