带有 2 个队列的 Spring AMQP 项目 [英] Spring AMQP Project with 2 queue's

查看:47
本文介绍了带有 2 个队列的 Spring AMQP 项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开展一个项目,该项目涉及 2 个队列,以及多个与它们交互的侦听器.流程:

I'm working on a project that involves 2 queue's, and multiples Listeners interacting with them. Flow:

  • 新的 HTTP 请求来到服务器,然后它被转换成一个对象作为消息
  • 此消息必须在两个队列中发布
  • 我有两种类型的监听器,它们从每个队列中获取消息,然后我可以做任何我想做的事情

我一直在阅读,最好的方法是使用扇出交换.这是我的代码:

I've been reading and the best way to do is with a fanout-exchange. Here is my code:

侦听器配置.xml

<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
    host="localhost" username="guest" password="guest" />

<rabbit:admin connection-factory="connectionFactory" />

<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
    durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
    durable="true" /> 

<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
        <rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>

<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />

<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />

<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
    connection-factory="connectionFactory" message-converter="jsonMessageConverter">
    <rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
    <rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>

sender-configuration.xml

sender-configuration.xml

<!--  First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />

<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory"  exchange="myExchange"
message-converter="jsonMessageConverter" />


<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
    <bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>

监听器 MainConfiguration.java

Listener MainConfiguration.java

@Configuration
public class MainConfiguration {

protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public DataController DataController(){
    return new DataController();
}

@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() { 
    //Create a new queue with an specific name and the durability value in true.
    return new Queue(this.persistenceQueue, true);
}

@Bean
public Queue webappQueue() {
    //Create a new queue with an specific name and the durability value in true.
    return new Queue(this.webappQueue, true);
}
}

发件人 MainConfiguration.java

Sender MainConfiguration.java

@Configuration
public class SenderConfiguration {

protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";

//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMessageConverter(new JsonMessageConverter());
    return template;
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
            "localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public IServiceManager scheduledProducer() {
    return new ServiceManagerImpl();
}

@Bean
public BeanPostProcessor postProcessor() {
    return new ScheduledAnnotationBeanPostProcessor();
}

}

谁能告诉我我做错了什么?两个监听器中的一个完美运行,第二个从不读取消息.

Can anyone please tell me what I'm doing wrong? One of the two Listeners, works perfectly, the second never reads a message.

推荐答案

基于上面解释的场景,我尝试创建一个使用 Spring Java Config 的示例应用程序.

Based on the scenario explained above I have tried to create a sample application which uses Spring Java Config.

消息被发布到 trashroutewebapp 队列,并且各自的接收者(persistencewebapp)接收消息.

Messages are published to trashroute and webapp queues, and respective receivers (persistence and webapp) receive the messages.

RabbitConfiguration.java(包含发送方和接收方的配置)

RabbitConfiguration.java (Contains configuration for both Sender and Receiver)

@Configuration
@EnableRabbit
public class RabbitConfiguration {

    public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue";
    public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }


    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public Queue trashRouteQueue() {
        return new Queue(BROADCAST_TRASHROUTE_QUEUE);
    }

    @Bean
    public Queue webAppQueue() {
        return new Queue(BROADCAST_WEBAPP_QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    @Bean
    public FanoutExchange trashRouteExchange() {
        FanoutExchange exchange = new FanoutExchange("trashroute");
        return exchange;
    }

    @Bean
    public Binding trashRouteBinding() {
        return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange());
    }

    @Bean
    public Binding webAppBinding() {
        return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange());
    }

    @Bean
    SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(trashRouteQueue(), webAppQueue());
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(trashRouteQueue(), webAppQueue());
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) {
        return new MessageListenerAdapter(webAppListener, "receiveMessage");
    }

    @Bean
    PersistenceListener persistenceListener() {
        return new PersistenceListener();
    }

    @Bean
    WebAppListener webAppListener() {
        return new WebAppListener();
    }

}

PersistenceListener.java

PersistenceListener.java

public class PersistenceListener {

    public void receiveMessage(String message) {
        System.out.println("Persistence Listener: Messsage Received <" + message + ">");
    }
}

WebAppListener.java

WebAppListener.java

public class WebAppListener {
    public void receiveMessage(String message) {
        System.out.println("WebAppListener: Message Received <" + message + ">");
    }
}

应用程序.java

@SpringBootApplication
public class Application implements CommandLineRunner {

    @Autowired
    AnnotationConfigApplicationContext context;

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Waiting five seconds...");
        Thread.sleep(5000);
        System.out.println("Sending message...");

        RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate");

        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!");
        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!");

        Thread.sleep(10000);
        context.close();
    }
}

希望这会有所帮助.虽然如果您想在生产中使用它,您需要重构代码.

Hope this will help. Although you would need to refactor the code if you want to use this in production.

这篇关于带有 2 个队列的 Spring AMQP 项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆