Spring IntegrationFlow http 请求到 amqp 队列 [英] Spring IntegrationFlow http request to amqp queue

查看:51
本文介绍了Spring IntegrationFlow http 请求到 amqp 队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spring Integration dsl 将以下 spring 集成项目转换为 java 配置版本.我运气不好,也找不到有关 dsl 的文档来帮助我了解足以解决此问题的框架.

I am trying to convert the following spring integration project to a java config version using the Spring Integration dsl. I'm not having much luck and I can't find documentation on the dsl that helps me understand the framework enough to get through this.

这是我要转换的项目.它使用 xml 配置.https://github.com/dsyer/http-amqp-tunnel

Here's the project I'm converting. It uses xml config. https://github.com/dsyer/http-amqp-tunnel

基本上,它需要一个 http 请求,然后通过rabbitmq 将其隧道传输到另一端的目标应用程序.可以在上面的链接中找到对项目应该做什么的很好的描述.

Basically it takes an http request then tunnels it through rabbitmq to a target app on the other side. A good description of what the project should do can be found on the link above.

我的应用程序与我上面列出的 github 上的应用程序之间的主要区别在于,我的应用程序基于 spring boot 1.5.1.RELEASE,而原始应用程序基于 1.1.4.BUILD-SNAPSHOT.此外,原始项目使用 spring 集成 xml 命名空间支持,即 int-http:inbound-gateway、int-http:outbound-gateway、int-amqp:outbound-gateway 和 int-amqp:inbound-gateway 而我使用的是Java 配置中的 IntegrationFlow dsl.

The main differences between my app and the one on github I listed above are that mine is based on spring boot 1.5.1.RELEASE and the original is on 1.1.4.BUILD-SNAPSHOT. Also, the original project uses the spring integration xml namespace support namely int-http:inbound-gateway, int-http:outbound-gateway, int-amqp:outbound-gateway and int-amqp:inbound-gateway whereas I'm using the IntegrationFlow dsl in a java config.

我的代码甚至从未在 RabbitMQ 上放置消息,而且我在浏览器中收到超时异常,因此我认为我的 IntegrationFlow 设置不正确.我添加了一个记录请求的线路分接头,当我从浏览器点击应用程序时,我只能看到一个线路分接头的输出.

My code never even puts a message on RabbitMQ and I get a timeout exception in the browser so I think my IntegrationFlow setup is incorrect. I have added a wire tap that logs the requests and I only see the output of one wire tap when I hit the app from a browser.

我们将不胜感激.

更新配置和错误

package org.springframework.platform.proxy;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.*;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.http.Http;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.web.client.RestTemplate;

@Configuration
@ComponentScan
@EnableAutoConfiguration
@EnableIntegration
public class TunnelApplication 
{
    public static void main(String[] args) 
    {
        SpringApplication.run(TunnelApplication.class, args);
    }

    @Value("${urlExpression}")
    private String urlExpression;

    @Value("${targetUrl}")
    private String targetUrl;

    @Value("${outboundQueue}")
    private String outboundQueue;

    @Value("${inboundQueue}")
    private String inboundQueue;

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue requestQueue() 
    {
        return new Queue(outboundQueue, true, false, true);
    }

    @Bean
    public Queue targetQueue() 
    {
        return new Queue(inboundQueue, true, false, true);
    }

    @Bean
    public RestTemplate safeRestTemplate()
    {
        return new RestTemplate();
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }


    @Bean
    public AmqpTemplate amqpTemplate()
    {
        RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
        result.setMessageConverter(jsonMessageConverter());
        return result;
    }

    @Bean
    public IntegrationFlow httpInboundGateway()
    {
        return IntegrationFlows
                .from(Http.inboundGateway("/tunnel"))
                .handle(
                        Amqp.outboundAdapter(amqpTemplate())
                            .mappedRequestHeaders("http_*")
                            .routingKey(outboundQueue)
//                          .routingKeyExpression("headers['routingKey']")
                        )
                .wireTap(f->f.handle(logger("outbound")))
                .get();
    }

    @Bean
    public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) 
    {
        return IntegrationFlows.from
                (
                    Amqp.inboundGateway(connectionFactory, inboundQueue)
                        .mappedRequestHeaders("http_*")
                        .messageConverter(jsonMessageConverter())
                )
                .handle(Http.outboundGateway(targetUrl))
                .wireTap(f->f.handle(logger("inbound")))
                .get();
    }


    @Bean
    public MessageHandler logger(String name) 
    {
         LoggingHandler loggingHandler =  new LoggingHandler(LoggingHandler.Level.INFO.name());
         loggingHandler.setLoggerName(name);
         return loggingHandler;
    }
}

继续打印以下错误消息,并且在应用程序运行时,RabbitMQ 上会保留一条消息.看起来它正在将其拉下并出现错误,然后再将其重新打开.这让我担心,因为我希望任何错误都传播回原始客户端,而不是让服务器陷入困境.

The following error message continues to get printed and there is a message that stays on RabbitMQ while the app is running. It looks like it is pulling it off and getting an error and then putting it back on. That concerns me because I want any errors to propagate back to the originating client and not bog down the server.

2017-02-06 16:00:12.167  INFO 10264 --- [nio-9000-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-02-06 16:00:12.167  INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-02-06 16:00:12.190  INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 23 ms
2017-02-06 16:00:16.806  INFO 10264 --- [erContainer#0-1] outbound                                 : <200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}>
2017-02-06 16:00:16.810  WARN 10264 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_66]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test:9000.amqpInboundGateway.channel#1'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=<200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}>, headers={http_requestMethod=GET, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2eb9b1c6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2eb9b1c6, amqp_consumerQueue=request, http_requestUrl=http://localhost:9000/tunnel/, id=bcb94ed9-45fc-c333-afee-de6e20a9f1b5, Content-Length=14, amqp_consumerTag=amq.ctag-ncEDSKdgWNKQk-jhGfqsbw, contentType=text/html;charset=UTF-8, http_statusCode=200, Date=1486418416000, timestamp=1486418416805}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$400(AmqpInboundGateway.java:52) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$1.onMessage(AmqpInboundGateway.java:154) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    ... 35 common frames omitted

根据 Gary 的评论进行配置修改为命中 Spring Boot 执行器的/beans 端点.

Config based on Gary's comment Modified to hit the /beans endpoint of a spring boot actuator.

@Bean
public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
    return IntegrationFlows.from(Http.inboundGateway("/tunnel"))
            .log()
            .handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName()))
            .log()
            .bridge(null)
            .get();
}

@Bean
public Queue queue() {
    return new AnonymousQueue();
}

@Bean
public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
            .log()
            .handle(Http.outboundGateway("http://localhost:8080/beans")
                    .expectedResponseType(String.class))
            .log()
            .bridge(null)
            .get();
}

@Bean
public IntegrationFlow finalWeb() {
    return IntegrationFlows.from(Http.inboundGateway("/beans"))
            .log()
            .<String, String>transform(String::toUpperCase)
            .log()
            .bridge(null)
            .get();
}

推荐答案

@SpringBootApplication
public class So42077149Application {

    public static void main(String[] args) {
        SpringApplication.run(So42077149Application.class, args);
    }

    @Bean
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
        return IntegrationFlows.from(Http.inboundGateway("/foo"))
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName()))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
                .log()
                .handle(Http.outboundGateway("http://localhost:8080/bar")
                        .expectedResponseType(String.class))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow finalWeb() {
        return IntegrationFlows.from(Http.inboundGateway("/bar"))
                .log()
                .<String, String>transform(String::toUpperCase)
                .log()
                .bridge(null)
                .get();
    }


}

结果:

$ curl -H "Content-Type: text/plain" -d foo localhost:8080/foo
FOO

编辑

使用 JSON...

otApplication公共类 So42077149Application {

otApplication public class So42077149Application {

    public static void main(String[] args) {
        SpringApplication.run(So42077149Application.class, args);
    }

    @Bean
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
        return IntegrationFlows.from(Http.inboundGateway("/foo"))
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .routingKey(queue().getName())
                        .mappedRequestHeaders("*")
                        .mappedReplyHeaders("*"))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
                .log()
                .handle(Http.outboundGateway("http://localhost:8080/bar")
                        .mappedRequestHeaders("*")
                        .mappedResponseHeaders("*")
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(Map.class))
                .log()
                .log(Level.INFO, "payloadClass", "payload.getClass()")
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow finalWeb() {
        return IntegrationFlows.from(Http.inboundGateway("/bar"))
                .log()
                .transform("{ \"foo\" : \"bar\" }")
                .enrichHeaders(h -> h.header("contentType", "application/json"))
                .log()
                .bridge(null)
                .get();
    }

}

这篇关于Spring IntegrationFlow http 请求到 amqp 队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆