Spring集成MQTT发布和订阅多个主题 [英] Spring integration MQTT publish & subscribe to multiple topics

查看:2810
本文介绍了Spring集成MQTT发布和订阅多个主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建一个应用程序,该应用程序订阅多个mqtt主题,获取信息,对其进行处理并形成xml,并在处理后触发一个事件,以便可以将这些事件发送到某个云服务器,并从中成功响应.发送回mqtt频道.

I am trying to build an application which subscribes to multiple mqtt topics, get the information, process it and form xmls and upon processing trigger an event so that these can be sent to some cloud server and the successful response from there to be sent back to the mqtt channel.

<int-mqtt:message-driven-channel-adapter
    id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}"
    channel="startCase" auto-startup="true" />

<int:channel id="startCase" />

<int:service-activator id="startCaseService"
        input-channel="startCase" ref="msgPollingService" method="pollMessages" />

    <bean id="mqttTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
    </bean>

    <bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService">
        <property name="taskExecutor" ref="mqttTaskExecutor" />
        <property name="vendorId" value="${vendorId}" />
    </bean>

我的问题是如何将其发布到多个渠道,即是否可以选择将X消息发布到Y主题.目前,我有以下内容:

My question is how do I publish this to multiple channels, i.e. if I have an option to publish X message to Y topic. At present I have the below:

<int:channel id="outbound" />

<int-mqtt:outbound-channel-adapter
    id="mqtt-publish" client-id="kj" client-factory="clientFactory"
    auto-startup="true" url="${brokerUrl}" default-qos="0"
    default-retained="true" default-topic="${responseTopic}" channel="outbound" />

    <bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener">
        <property name="sccUrl" value="${url}" />
        <property name="restTemplate" ref="restTemplate" />
        <property name="channel" ref="outbound" />
    </bean>

我可以这样发布:

channel.send(MessageBuilder.withPayload("customResponse").build());

我可以做类似的事情吗?

Can I do something like:

channel.send(Message<?>, topic)

推荐答案

您的配置看起来不错.但是,MessageChannel是松耦合的抽象,并且仅处理Message.

Your configuration looks good. However the MessageChannel is an abstraction for loosely-coupling and gets deal only with Message.

因此,您要求a-la channel.send(Message<?>, topic)对于消息传递概念不正确.

So, you request a-la channel.send(Message<?>, topic) isn't correct for Messaging concepts.

但是,我们为您提供了一个窍门.来自AbstractMqttMessageHandler:

However we have a trick for you. From AbstractMqttMessageHandler:

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
.....
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);

因此,您需要的代码是这样的:

So, what you need from your code is this:

channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());

换句话说,您应该发送带有mqtt_topic标头的Message来从<int-mqtt:outbound-channel-adapter>实现动态发布.

In other words you should send a Message with mqtt_topic header to achieve a dynamic publication from <int-mqtt:outbound-channel-adapter>.

另一方面,我们不建议直接从应用程序中使用MessageChannel.带有服务接口的<gateway>用于最终应用程序.其中topic可以是标记为@Header(MqttHeaders.TOPIC)

From other side we don't recommend to use MessageChannels directly from the application. The <gateway> with service interface is for such a case for end-application. Where that topic can be one of service method argument marked as @Header(MqttHeaders.TOPIC)

这篇关于Spring集成MQTT发布和订阅多个主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆