Java:MQTT MessageProducerSupport to Flux [英] Java: MQTT MessageProducerSupport to Flux
本文介绍了Java:MQTT MessageProducerSupport to Flux的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个简单的MQTT客户端,它通过IntegrationFlow
:
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://test.mosquitto.org:1883" });
factory.setConnectionOptions(options);
return factory;
}
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
"myConsumer",
mqttClientFactory(),
"/test/#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
我需要将所有收到的消息通过管道传输到Flux
以便进一步处理。
public Flux<String> mqttChannel() {
...
return mqttFlux;
}
我如何才能做到这一点?loggingHandler从IntegrationFlow接收所有消息。我的Flux不能以类似的方式获得它的输入吗-通过以某种方式将其传递给IntegrationFlowsHandle函数?
MQTT示例代码取自https://github.com/spring-projects/spring-integration-samples/blob/master/basic/mqtt/src/main/java/org/springframework/integration/samples/mqtt/Application.java尝试:遵循Artem bilans建议,我现在正在尝试使用toReactivePublisher
将入站IntegrationFlow
转换为Flux
。
public Flux<String> mqttChannel() {
Publisher<Message<Object>> flow = IntegrationFlows.from(mqttInbound())
.toReactivePublisher();
Flux<String> mqttFlux = Flux.from(flow)
.log()
.map(i -> "TESTING: Received a MQTT message");
return mqttFlux;
}
运行示例时出现以下错误:
10:14:39.541 [MQTT Call: myConsumer] ERROR o.s.i.m.i.MqttPahoMessageDrivenChannelAdapter - Unhandled exception for GenericMessage [payload=OFF,26.70,65.00,663,-62,192.168.2.100,0.026,25,4,6,7,933,278,27,4,1,0,1580496218,730573600,1800000,1980000,1580496218,730573600,10800000,11880000, headers={mqtt_receivedRetained=true, mqtt_id=0, mqtt_duplicate=false, id=3f7565aa-ff4f-c389-d8a9-712d4f06f1cb, mqtt_receivedTopic=/083B7036697886C41D2DF2FD919143EE/MasterBedroom/Sensor/, mqtt_receivedQos=0, timestamp=1602231279537}]
结论:第一条消息一到达,就会被错误处理并引发异常。
推荐答案
尚不清楚您希望使用该&Quot;My Flux&Quot;实现什么功能,以及效果如何,但对于您当前的配置,有几种解决方案。
您可以使用已经是Publisher
的FluxMessageChannel
,因此只需使用Flux.from()
和的订阅者即可使用上述MqttPahoMessageDrivenChannelAdapter
生成的数据。
IntegrationFlowBuilder
上使用toReactivePublisher()
,将整个流公开为反应性Publsiher
源。当然,在这种情况下,您不能使用LoggingHandler
,因为它是单向的,使您的流正好在这里结束。您可以考虑改用log()
运算符:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-log
顺便说一句,FluxMessageChannel
是发布-订阅的,因此您可以将其放在这些日志的流中,也可以将其放在外部用于Flux.from()
订阅。此频道的所有订阅者都将收到相同的消息。
这篇关于Java:MQTT MessageProducerSupport to Flux的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文