正确的MQTT订阅代码,可保持持久性 [英] Proper MQTT subscription code that maintains persistence

查看:383
本文介绍了正确的MQTT订阅代码,可保持持久性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找用于订阅给定主题的MQTT客户端的Java代码,在该主题上发布的每条消息应仅到达客户端一次.我编写了许多代码,并且在所有情况下,消息均已正确地传递给客户端当它连接到代理,但是如果订阅的客户端与代理断开连接一段时间,然后再次连接回去时,它没有收到未连接期间发送的消息,并且我设置了干净会话标志同样是错误的,但仍然无法正常工作,下面给出了我使用的代码

I am looking for a java code for MQTT client that subscribes to a given topic, every message published on that topic should reach the client only once.I have written many codes and in all the cases messages are delivered properly to the client when it is connected to the broker but if the subscribed client disconnects from the broker for some time and then again connects back, it does not receive the messages that were sent during the time that it was not connected and I have set the clean session flag also as false but still its not working, the code that I used is given below

import org.fusesource.hawtbuf.*;
import org.fusesource.mqtt.client.*;

/**
 * Uses an callback based interface to MQTT.  Callback based interfaces
 * are harder to use but are slightly more efficient.
 */
class Listener {

    public static void main(String []args) throws Exception {

        String user = env("APOLLO_USER", "admin");
        String password = env("APOLLO_PASSWORD", "password");
        String host = env("APOLLO_HOST", "localhost");
        int port = Integer.parseInt(env("APOLLO_PORT", "61613"));
        final String destination = arg(args, 1, "subject");


        MQTT mqtt = new MQTT();
        mqtt.setHost(host, port);
        mqtt.setUserName(user);
        mqtt.setPassword(password);
    mqtt.setCleanSession(false);
    mqtt.setClientId("newclient");

        final CallbackConnection connection = mqtt.callbackConnection();
        connection.listener(new org.fusesource.mqtt.client.Listener() {
            long count = 0;
            long start = System.currentTimeMillis();

            public void onConnected() {
            }
            public void onDisconnected() {
            }
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
            public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
                System.out.println("Nisha Messages : " + msg);
                System.out.println("Nisha topic" + topic);
                System.out.println("Nisha Receive acknowledgement : " + ack);
                String body = msg.utf8().toString();
                if("SHUTDOWN".equals(body)) {
                    long diff = System.currentTimeMillis() - start;
                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
                    connection.disconnect(new Callback<Void>() {
                        @Override
                        public void onSuccess(Void value) {
                            System.exit(0);
                        }
                        @Override
                        public void onFailure(Throwable value) {
                            value.printStackTrace();
                            System.exit(-2);
                        }
                    });
                } else {
                    if( count == 0 ) {
                        start = System.currentTimeMillis();
                    }
                    if( count % 1000 == 0 ) {
                        System.out.println(String.format("Received %d messages.", count));
                    }
                    count ++;
                }
            }
        });
        connection.connect(new Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
                System.out.println("connected in :::: ");
                Topic[] topics = {new Topic(destination, QoS.AT_MOST_ONCE)};
                connection.subscribe(topics, new Callback<byte[]>() {
                    public void onSuccess(byte[] qoses) {
                    }
                    public void onFailure(Throwable value) {
                        value.printStackTrace();
                        System.exit(-2);
                    }
                });
            }
            @Override
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
        });

        // Wait forever..
        synchronized (Listener.class) {
            while(true)
                Listener.class.wait();
        }
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null )
            return defaultValue;
        return rc;
    }

    private static String arg(String []args, int index, String defaultValue) {
        if( index < args.length )
            return args[index];
        else
            return defaultValue;
    }
}

我在这里做错什么了吗?

Am I doing something wrong here?

推荐答案

它没有收到未连接期间发送的消息

it does not receive the messages that were sent during the time that it was not connected

MQTT 保留所有消息.如果客户端下线,则未送达的邮件将丢失.保留机制仅保留发布到主题的最后消息.

MQTT does not retain all messages. If the client goes offline, undelivered messages are lost. The retain mechanism retains only the last message published to a topic.

您可以在规范中进一步了解要点 3.3.1.3保留

这篇关于正确的MQTT订阅代码,可保持持久性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆