尝试使用Paho Client MqttCallback在messageArrived()中发布 [英] Trying to publish in messageArrived() with the Paho Client MqttCallback

查看:1501
本文介绍了尝试使用Paho Client MqttCallback在messageArrived()中发布的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在messageArrived(...)中发布对传入消息的响应.但是发布挂起和下一行:logOutgoingMessage(topic, message)从未被调用...最后,我陷入僵局,客户端断开连接.

I'm trying to publish the response to the incoming message in messageArrived(...). But the publish hang and the next line: logOutgoingMessage(topic, message) is never called... At the end i get a deadlock and the client disconnect.

这是我的代码:

@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {

@EJB
private AppliFacade facade;

@PostConstruct
public void start() {
    try {
        // connection options
        connOpts = new MqttConnectOptions();
        connOpts.setKeepAliveInterval(120);         
        connOpts.setCleanSession(true);
        connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);

        client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
        client.setCallback(this);
        connect();

        client.subscribe(SUBSCRIPTION_TOPIC, QoS);
    } catch (MqttException me) {
        log.error("Connection to " + BROKER_URL + " failed");
        logMqttException(me);
    }

}

private void connect() {
    // Tying a cycle of reconnects.
    boolean tryConnecting = true;
    while (tryConnecting) {
        try {
            client.connect(connOpts);
        } catch (Exception e1) {
            log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");             
        }
        if (client.isConnected()) {
            log.info("Connected to Broker " + BROKER_URL);
            tryConnecting = false;
        } else {
            pause();
        }
    }
}

private void publishAMessage(String topic, String pubMsg) {
    MqttMessage message = new MqttMessage(pubMsg.getBytes());
    message.setQos(QoS);
    // Publish the message
    log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
    try {
        // Publish to the broker
        client.publish(topic, message);
        // Wait until the message has been delivered to the broker
        logOutgoingMessage(topic, message);
    } catch (Exception e) {
        log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
    }
}

private String handleRquest(AbstractRequest request) throws JsonProcessingException {
    ...

    return jsonResp;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    // generate the response message ID
    messageId = "EB" + System.currentTimeMillis();

    // log the message
    logIncomingMessage(topic, message);

    // handle the message
    AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);

    // handle the request
    String jsonResp = handleRquest(request);

    // publish message
    publishAMessage(request.getReplyTopic(), jsonResp);
}

/**
 * 
 * Method callback is invoked when a message published by this client is
 * successfully received by the broker.
 * 
 */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // NOT NEEDED
}

}

推荐答案

按以下步骤更改代码.

MqttDeliveryToken token;
...
MqttTopic mqttTopic = client.getTopic(topic);
try {
  // Publish to the broker
  token = mqttTopic.publish(new MqttMessage(pubMsg.getBytes()));
  logOutgoingMessage(topic, message);
  ...
 }

但是我不明白为什么第一个实现不起作用:x 可能在带有QoS 2的messageArrived()中发布不合适吗?

But I don't understand why the first implementation doesn't work :x May be publishing in messageArrived() with the QoS 2 is not appropriate ?

这篇关于尝试使用Paho Client MqttCallback在messageArrived()中发布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆