spring amqp rabbitmq MessageListener无效 [英] spring amqp rabbitmq MessageListener not working

查看:2293
本文介绍了spring amqp rabbitmq MessageListener无效的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用spring amqp使用rabbitmq,下面是我的配置。

I am trying to use rabbitmq using spring amqp, below is my configuration.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
    class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>

这是一个简单的Message Listener类,

This is simple Message Listener class,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
    }

}

这是生产者(即itemWriter)春季批次),

This is producer (which is itemWriter of spring batch),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

当我运行我的Spring批处理作业,调用ImportItemWriter.write。但是ImportMessageListener.onMessage不起作用。它不打印消息。我得到控制台上所有项目的输出

When I run my spring batch job, ImportItemWriter.write gets called. But ImportMessageListener.onMessage does not work. It doesnt print the message. I get below output for all items on console

producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null


推荐答案

您的消费者未发送结果...

Your consumer is not sending a result...

@Override
public void onMessage(Message message) {
    System.out.println("consumer output: " + message);
}

将其更改为简单的POJO;容器的 MessageListenerAdapter 将为您处理转换,并发送结果。

Change it to a simple POJO; the container's MessageListenerAdapter will take care of the conversion for you, and send the result.

@Override
public String handleMessage(String message) {
    System.out.println("consumer output: " + message);
    return "result";
}

编辑:

您还没有设置任何交换或路由到您的队列。如果您想使用默认的交换/路由,请使用...

You also haven't set up any exchange or routing to your queue. If you want to use default exchange/routing, use...

convertSendAndReceive("", queueName, item.toString());

EDIT2:

或者,设置模板上的 routingKey 到队列名称然后你可以使用更简单的方法。

Or, set the routingKey on the template to the queue name and then you can use the simpler method.

... sendAndReceive()方法用于请求/回复方案,因此需要阻塞。要异步执行,您必须使用 ... send()方法之一,并连接自己的 SimpleListenerContainer 收到回复;你必须做自己的关联。使用

The ...sendAndReceive() methods are meant for request/reply scenarios so blocking is required. To do it asynchronously, you have to use one of the ...send() methods, and wire up your own SimpleListenerContainer to receive the replies; you will have to do your own correlation. Use

public void convertAndSend(Object message, MessagePostProcessor postProcessor)

并在邮件处理器中设置 replyTo correlationId header ...

and in your message post processor, set the replyTo and correlationId headers...

message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");

或者,自己构建消息对象(例如,使用 MessageBuilder )并使用发送方法...

Or, build the Message object yourself (e.g by using the MessageBuilder) and use the send method...

template.send(MessageBuilder.withBody("foo".getBytes())
            .setReplyTo("bar")
            .setCorrelationId("baz".getBytes())
            .build());

每个请求都需要一个唯一的 correlationId 所以你可以关联响应。

Each request needs a unique correlationId so you can correlate the response.

这篇关于spring amqp rabbitmq MessageListener无效的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆