无法接收已发布的消息以订阅mqtt paho上的主题 [英] Cannot receive already published messages to subscribed topic on mqtt paho

查看:203
本文介绍了无法接收已发布的消息以订阅mqtt paho上的主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用paho发送和接收mqtt消息。到目前为止,发送消息没有问题。我收到它们有问题。我的代码是:

I'm using paho to send and receive mqtt messages. So far it has been no problem to send the messages. I have problems with receiving them.My code is:

     package BenchMQTT;

     import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     import org.eclipse.paho.client.mqttv3.IMqttToken;
     import org.eclipse.paho.client.mqttv3.MqttCallback;
     import org.eclipse.paho.client.mqttv3.MqttException;
     import org.eclipse.paho.client.mqttv3.MqttMessage;
     import org.eclipse.paho.client.mqttv3.MqttClient;

     public class Test_A_2 implements MqttCallback {

     MqttClient clientR;
     MqttClient clientS;

     public Test_A_2() {
     }

     public static void main(String[] args) throws InterruptedException {
         long startTime = System.currentTimeMillis();
         new Test_A_2().doDemo();
         long endTime = System.currentTimeMillis();
     }

    public void doDemo() throws InterruptedException {
    try {   
    clientS = new MqttClient("tcp://mybroker:1883", "Sender");
    clientR = new MqttClient("tcp://mybroker:1883", "Reiever");
    clientR.connect();
    clientS.connect();
    MqttMessage message = new MqttMessage();

    String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk"
            + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj"
            + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh"
            + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg"
            + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd"
            + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas"
            + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa"
            + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop"
            + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio"
            + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui"
            + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu"
            + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty"
            + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert"
            + "nmqwertyuiop";

    clientR.subscribe("BenchMQTT");   
    clientR.setCallback(this);

    for(int i=0;i<10;i++)
    {
    message.setPayload((messagePayload)
            .getBytes());
    System.out.println(i);
    clientS.publish("BenchMQTT", message);
    }
    clientR.disconnect();   
    clientS.disconnect();
    clientR.close();   
    clientS.close();

   } catch (MqttException e)
    {
     System.out.println("ERROR");
    }
 }

     @Override
     public void connectionLost(Throwable cause) {
         // TODO Auto-generated method stub

     }

     @Override
     public void messageArrived(String topic, MqttMessage message)
     {
         System.out.println("Received: " + message.toString());
     }

     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {

     }

     }

此发送和接收消息。

输出:

0
Received: 0
1
Received: 1
2
Received: 2
3
Received: 3
4
Received: 4
5
Received: 5
6
Received: 6
7
Received: 7
8
Received: 8
9
Received: 9

我想发送消息,然后收到消息。有帮助吗?
预期产出:

I would like to send messages, and after that receive them. Any help? Expected OUTPUT:

0
1
2
3
4
5
6
7
8
9
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9


推荐答案

以下代码可以满足您的需求,但它强制使用MQTT以某种不应该的方式行事。消息排队仅用于确保所有消息都被发送到客户端,即使它在一段时间内断开连接,消息也将始终尽快发送。

The following code does what you want, but it is forcing MQTT to behave in a way it is not supposed to. Message queuing on the is only intended to ensure all messages are delivered to a client even if it is disconnected for a period, messages will always be delivered at the earliest possible opportunity.

 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.IMqttToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.MqttClient;

 public class Test_A_2 implements MqttCallback {

 MqttClient clientR;
 MqttClient clientS;

 public Test_A_2() {
 }

 public static void main(String[] args) throws InterruptedException {
     long startTime = System.currentTimeMillis();
     new Test_A_2().doDemo();
     long endTime = System.currentTimeMillis();
 }

public void doDemo() throws InterruptedException {
try {   

    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false);

clientS = new MqttClient("tcp://localhost:1883", "Sender");
clientR = new MqttClient("tcp://localhost:1883", "Reiever");
clientR.connect(options);
clientS.connect();
clientR.setCallback(this);
clientR.subscribe("BenchMQTT",2);
MqttMessage message = new MqttMessage();

String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk"
        + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj"
        + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh"
        + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg"
        + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd"
        + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas"
        + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa"
        + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop"
        + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio"
        + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui"
        + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu"
        + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty"
        + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert"
        + "nmqwertyuiop";

clientR.disconnect();

for(int i=0;i<10;i++)
{
message.setPayload((messagePayload)
        .getBytes());
System.out.println(i);
message.setQos(2);
clientS.publish("BenchMQTT", message);
}



clientR.connect(options);
clientR.setCallback(this);
clientR.subscribe("BenchMQTT",2);

clientR.disconnect();   
clientS.disconnect();
clientR.close();   
clientS.close();

} catch (MqttException e)
{
 System.out.println("ERROR");
 e.printStackTrace();
}
}

 @Override
 public void connectionLost(Throwable cause) {
     // TODO Auto-generated method stub

 }

 @Override
 public void messageArrived(String topic, MqttMessage message)
 {
     System.out.println("Received: " + message.toString());
 }

 @Override
 public void deliveryComplete(IMqttDeliveryToken token) {

 }

 }

这篇关于无法接收已发布的消息以订阅mqtt paho上的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆