使用Spring AMQP接收和发送Java对象 [英] Receive and Send Java Objects with Spring AMQP

查看:1187
本文介绍了使用Spring AMQP接收和发送Java对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想实现Spring AMQP示例,用于使用侦听器发送和接收Java对象。我试过这个:

I want to implement Spring AMQP example for sending and receiving Java Objects using listener. I tried this:

发送Java对象

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));              
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));

接收并发回另一个Java对象:

Receive and send Back another Java Object:

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
                .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION);
System.out.println(" !!!!!!! Received id " + obj.getTransaction_id());

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

container.setMessageListener(new MessageListener() {
  @Override
  public void onMessage(Message message) {
    // Receive here Java object and send back another object
  }
});

你能告诉我如何在没有复杂注释的情况下扩展代码只是简单的监听器吗?

Can you show me how to extend the code without complex annotations just simple listeners?

推荐答案

最简单的方法是使用 @RabbitListener - 使用Spring Boot时更容易因为他将连接基础设施bean(模板,管理员等)。

The simplest way is to use a @RabbitListener - made even easier when using Spring Boot since he will wire up infrastructure beans (template, admin, etc).

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    @RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION)
    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

如果你不想出于某种原因使用该注释,您可以使用MessageListenerAdapter连接容器...

If you don't want to use that annotation for some reason, you can wire up a container using a MessageListenerAdapter...

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
        container.setMessageListener(new MessageListenerAdapter(listener, "process"));
        return container;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

当然,您可以连线在你的问题中,使用适配器自己启动容器,但通常最好让Spring将其作为 @Bean 进行管理,否则你会错过一些功能(例如事件发布)对于失败,闲置容器)。适配器获取对您的请求/回复侦听器的引用以及要调用的方法名称。

You can, of course, wire up the container yourself, as in your question, using the adapter, but it's generally better to let Spring manage it as a @Bean or you will miss some functionality (e.g. event publishing for failures, idle container). The adapter gets a reference to your request/reply listener and the method name to call.

这篇关于使用Spring AMQP接收和发送Java对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆