Spring 与 Rabbit AMQP 的集成,用于“客户端发送消息 ->"服务器接收&在返回队列中返回 msg -->客户端获取相关消息" [英] Spring integration with Rabbit AMQP for "Client Sends Message -> Server Receives & returns msg on return queue --> Client get correlated msg"

查看:55
本文介绍了Spring 与 Rabbit AMQP 的集成,用于“客户端发送消息 ->"服务器接收&在返回队列中返回 msg -->客户端获取相关消息"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我能够使用 Rabbit Java API 编写一个 Java 程序,执行以下操作:

I am able to write a java program using Rabbit Java API's doing the following:

  1. 客户端通过具有相关 ID 的 Rabbit MQ 交换/队列发送消息(比如 UUID -348a07f5-8342-45ed-b40b-d44bfd9c4dde").

  1. Client sends message over Rabbit MQ exchange/queue with correlation Id (Say UUID - "348a07f5-8342-45ed-b40b-d44bfd9c4dde").

服务器收到消息.

服务器通过具有相同关联 ID -348a07f5-8342-45ed-b40b-d44bfd9c4dde"的 Rabbit MQ 交换/队列发送响应消息.

Server sends response message over Rabbit MQ exchange/queue with the same correlation Id - "348a07f5-8342-45ed-b40b-d44bfd9c4dde".

客户端仅在与 1 相同的线程中收到相关消息.

Client received the correlated message only in the same thread as 1.

下面是使用 Rabbit API 的 Send.java 和 Recv.java.我需要帮助将此示例转换为使用 Spring AMQP 集成,尤其是第 4 步中的接收部分.我正在寻找可以使用相关 ID 过滤消息的接收方法之类的东西.

Below is the Send.java and Recv.java using Rabbit APIs. I need help to convert this sample to use Spring AMQP integration especially receiving part on step 4. I am looking for something like receive method which can filter message using correlation Id.

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Send {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        String message = "Hello World!";
        String cslTransactionId = UUID.randomUUID().toString();
        BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(cslTransactionId)
            .replyTo(RESPONSE_QUEUE).build();

        channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());

        System.out.println("Client Sent '" + message + "'");


        Channel responseChannel = connection.createChannel();
        responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String responseMessage = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            if (correlationId.equals(cslTransactionId)) {
                    System.out.println("Client Received '" + responseMessage + "'");
                responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                break;
            }
        }

        channel.close();
        connection.close();
    }
}

Recv.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(REQUEST_QUEUE, true, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            System.out.println("Server Received '" + message + "'");
            if (correlationId != null)
                break;
            }

            String responseMsg = "Response Message";
            Channel responseChannel = connection.createChannel();
            responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
            BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(correlationId).build();

            channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());

            System.out.println("Server Sent '" + responseMsg + "'");

            channel.close();
            connection.close();
       }
}

运行 gary 提供的 Java 配置后,我尝试将配置移动到 XML 格式以用于服务器端添加侦听器.下面是 XML 配置:

After running the Java configuration provided by gary, I am trying to move the configuration to XML format for server side adding listener. Below is the XML configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean 
        id="serviceListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="queues" ref="requestQueue"/>
            <property name="messageListener" ref="messageListenerAdaptor"/>
    </bean>

    <bean id="messageListenerAdaptor"
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="pojoListener" />
    </bean>

    <bean 
        id="pojoListener"
        class="PojoListener"/>

    <bean
        id="replyListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="queues" ref="replyQueue"/>
        <property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
    </bean>

    <!-- Infrastructure -->
    <rabbit:connection-factory 
        id="connectionFactory" 
        host="localhost" 
        username="guest" 
        password="guest" 
        cache-mode="CHANNEL" 
        channel-cache-size="5"/>

    <rabbit:template 
        id="fixedReplyQRabbitTemplate" 
        connection-factory="connectionFactory"
        exchange="fdms.exchange"
        routing-key="response.key"
        reply-queue="RESPONSE.QUEUE">
        <rabbit:reply-listener/>
    </rabbit:template>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
    <rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />

    <rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

SpringReceive.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class SpringReceive {

/**
 * @param args
 */
public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
    SimpleMessageListenerContainer serviceListenerContainer =     context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
    serviceListenerContainer.start();
    }
}

推荐答案

您可以将 RabbitTemplate.sendAndReceive()(或 convertSendAndReceive())与回复侦听器容器一起使用(文档在这里);模板将为您处理相关性.

You can use RabbitTemplate.sendAndReceive() (or convertSendAndReceive()) with a reply listener container (Docs here); the template will take care of the correlation for you.

如果您使用 Spring Integration,请使用带有适当配置的 rabbit 模板的出站网关.

If you are using Spring Integration, use an outbound gateway with an appropriately configured rabbit template.

这篇关于Spring 与 Rabbit AMQP 的集成,用于“客户端发送消息 ->"服务器接收&amp;在返回队列中返回 msg -->客户端获取相关消息"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆