java - 如何在java中使用rabbitmq异步发送消息以将它们排队而无需等待spring amqp中的每条消息的回复? [英] How to send messages asynchronously to queue them up without waiting for reply of each message in spring amqp using rabbitmq in java?

查看:146
本文介绍了java - 如何在java中使用rabbitmq异步发送消息以将它们排队而无需等待spring amqp中的每条消息的回复?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 spring amqp 使用 rabbitmq,以下是我的配置.

I am trying to use rabbitmq using spring amqp, below is my configuration.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"  routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener"  method="onMessage" />
</rabbit:listener-container>

这是一个简单的消息监听器类,

This is simple Message Listener class,

public class ImportMessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
        return message;
    }

}

这是producer(spring batch的itemWriter),

This is producer (which is itemWriter of spring batch),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

当我运行我的 spring 批处理作业时,每条消息都被一条一条地发送和处理,我在响应下面得到了

When I run my spring batch job, each message is sent and handled one by one and I am gettig below response

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

它应该发送 5 条消息并将它们排队,5 个消费者线程(concurrency=5)应该并发处理它们并在完成后立即响应

It shoudl send 5 messages and queue them up and 5 consumer threads (concurrency=5) should handle them concurrently and should respond as soon as it completes

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

我不想让生产者等待第一条消息的回复来排队第二条消息.

I dont want the producer to wait for the reply of first message to queue the second message.

我尝试使用 convertAndSend 这使得它异步(不等待回复)但是我如何在我的 itemWriter 中获得回复消息,就像我可以通过 convertSendAndReceive 获得一样?

I tried using convertAndSend which makes it Asynchronous (doesnt wait for reply) but how do I get reply message in my itemWriter like I can get with convertSendAndReceive ?

如果我将模板配置更改为

If I change my template configuration to

<rabbit:template id="importAmqpTemplate"
        connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
        routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
        <rabbit:reply-listener/>
    </rabbit:template>

如果我使用 template.convertAndSend(item.toString());那我怎样才能得到回复消息呢?

and if I use template.convertAndSend(item.toString()); then how can I get reply message ?

我无法将自己的消息处理程序附加到此侦听器以获取我们可以在消费者端附加的方式的回复消息.对于回复,它采用默认的 RabbitmqTemplate 处理程序.

I cant attach my own message handler to this listener to get reply message the way we can attach on consumer side. For reply, it takes default RabbitmqTemplate handler.

推荐答案

首先让我解释一下是怎么回事

First of all let me explain what's going on

您使用同步 sendAndReceive 操作.在您的消息将被发送之前,它被 TemporaryReplyQueue 丰富,并且发送者(生产者)线程正在阻塞以等待来自该 replyQueue 的回复.这就是为什么您的所有消息都被串行处理的原因.

You use synchronous sendAndReceive operation. Before your message will be send it is enriched with TemporaryReplyQueue and the sender (producer) Thread is blocking to wait the reply from that replyQueue. That's why your all messages are processed serially.

要继续,我们需要知道在单向 ItemWriter 生产者中使用该操作的原因.

To go ahead we need to know the reason to use that operation in the one-way ItemWriter producer.

也许 RabbitTemplate#convertAndSend 对你来说就足够了?

Maybe the RabbitTemplate#convertAndSend would be enough for you ?

更新

也许你只需要RabbitTemplate.ReturnCallbackconvertAndSend 一起确定您的消息是否已送达?

Maybe do you just need RabbitTemplate.ReturnCallback alongside with convertAndSend to determine, if your message was delivered or not?

更新 2

实现您使用 TaskExecutor 并行发送和接收的要求的另一个想法.这样你就不需要等待第一条消息发送第二条消息的依赖:

Another thought to achieve your requirement to use TaskExecutor to send and receive in parallel. With that you don't need to to wait the rely for first message to send the second:

final List<Object> replies = new ArrayList<Object>();
ExecutorService executor = Executors.newCachedThreadPool();
for (T item : items) {
     executor.execute(new Runnable() {

         public void run() {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
            replies.add(reply);
         }
     });
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

然后你可以对当前items的所有回复做一些事情.

And after that you can do something with all replies for current items.

这篇关于java - 如何在java中使用rabbitmq异步发送消息以将它们排队而无需等待spring amqp中的每条消息的回复?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆