Java:MQTT MessageProducerSupport to Flux [英] Java: MQTT MessageProducerSupport to Flux

查看:23
本文介绍了Java:MQTT MessageProducerSupport to Flux的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的MQTT客户端,它通过IntegrationFlow

输出收到的消息
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { "tcp://test.mosquitto.org:1883" });
    factory.setConnectionOptions(options);
    return factory;
}

public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            "myConsumer",
            mqttClientFactory(),
            "/test/#");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ", received from MQTT")
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
}

我需要将所有收到的消息通过管道传输到Flux以便进一步处理。

public Flux<String> mqttChannel() {
    ...
    return mqttFlux;
}

我如何才能做到这一点?loggingHandler从IntegrationFlow接收所有消息。我的Flux不能以类似的方式获得它的输入吗-通过以某种方式将其传递给IntegrationFlowsHandle函数?

MQTT示例代码取自https://github.com/spring-projects/spring-integration-samples/blob/master/basic/mqtt/src/main/java/org/springframework/integration/samples/mqtt/Application.java

尝试:遵循Artem bilans建议,我现在正在尝试使用toReactivePublisher将入站IntegrationFlow转换为Flux

public Flux<String> mqttChannel() {
    Publisher<Message<Object>> flow = IntegrationFlows.from(mqttInbound())
            .toReactivePublisher();
    Flux<String> mqttFlux = Flux.from(flow)
            .log()
            .map(i -> "TESTING: Received a MQTT message");
    return mqttFlux;
}

运行示例时出现以下错误:

10:14:39.541 [MQTT Call: myConsumer] ERROR o.s.i.m.i.MqttPahoMessageDrivenChannelAdapter - Unhandled exception for GenericMessage [payload=OFF,26.70,65.00,663,-62,192.168.2.100,0.026,25,4,6,7,933,278,27,4,1,0,1580496218,730573600,1800000,1980000,1580496218,730573600,10800000,11880000, headers={mqtt_receivedRetained=true, mqtt_id=0, mqtt_duplicate=false, id=3f7565aa-ff4f-c389-d8a9-712d4f06f1cb, mqtt_receivedTopic=/083B7036697886C41D2DF2FD919143EE/MasterBedroom/Sensor/, mqtt_receivedQos=0, timestamp=1602231279537}]

结论:第一条消息一到达,就会被错误处理并引发异常。

推荐答案

请阅读此文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/reactive-streams.html#reactive-streams

尚不清楚您希望使用该&Quot;My Flux&Quot;实现什么功能,以及效果如何,但对于您当前的配置,有几种解决方案。

您可以使用已经是PublisherFluxMessageChannel,因此只需使用Flux.from()和的订阅者即可使用上述MqttPahoMessageDrivenChannelAdapter生成的数据。

另一种方法是在IntegrationFlowBuilder上使用toReactivePublisher(),将整个流公开为反应性Publsiher源。当然,在这种情况下,您不能使用LoggingHandler,因为它是单向的,使您的流正好在这里结束。您可以考虑改用log()运算符:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-log

顺便说一句,FluxMessageChannel是发布-订阅的,因此您可以将其放在这些日志的流中,也可以将其放在外部用于Flux.from()订阅。此频道的所有订阅者都将收到相同的消息。

这篇关于Java:MQTT MessageProducerSupport to Flux的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆