Spring Boot Rabbitmq MappingJackson2MessageConverter自定义对象转换 [英] spring boot rabbitmq MappingJackson2MessageConverter custom object conversion

查看:81
本文介绍了Spring Boot Rabbitmq MappingJackson2MessageConverter自定义对象转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个简单的带有Spring Boot的Spring Boot应用程序,该应用程序可以生成"到Rabbitmq交换/队列的消息,以及另一个示例Spring Boot应用程序,可以使用"这些消息. 因此,我有两个应用程序(如果需要,也可以提供微服务). 1)生产者"微服务 2)消费者"微服务

I'm trying to create a simple spring boot app with spring boot that "produce" messages to a rabbitmq exchange/queue and another sample spring boot app that "consume" these messages. So I have two apps (or microservices if you wish). 1) "producer" microservice 2) "consumer" microservice

生产者"具有2个域对象. Foo和Bar,应将其转换为json并发送给rabbitmq. 消费者"应该接收并将json消息分别转换为Foo和Bar域. 由于某些原因,我无法完成此简单任务.关于这个的例子不多. 对于消息转换器,我想使用org.springframework.messaging.converter.MappingJackson2MessageConverter

The "producer" has 2 domain objects. Foo and Bar which should be converted to json and send to rabbitmq. The "consumer" should receive and convert the json message into a domain Foo and Bar respectively. For some reason I can not make this simple task. There are not much examples about this. For the message converter I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter

这是我到目前为止所拥有的:

Here is what I have so far:

生产者微服务

package demo.producer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    Queue queue() {
        return new Queue("queue", false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("queue");
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception {
        sender.sendToRabbitmq(new Foo(), new Bar());
    }
}

@Service
class Sender {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmq(final Foo foo, final Bar bar) {

        this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);

    }
}

class Bar {
    public int age = 33;
}

class Foo {
    public String name = "gustavo";
}

消费者微服务

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private Receiver receiver;

    @Override
    public void run(String... args) throws Exception {

    }

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

这是我得到的例外:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
    ... 12 common frames omitted

该异常表示没有转换器,是的,我的问题是我不知道如何在消费者端设置 MappingJackson2MessageConverter 转换器(请注意,我想使用 org.springframework.messaging.converter.MappingJackson2MessageConverter ,而不是 org.springframework.amqp.support.converter.JsonMessageConverter )

The exception says there is no converter, and that is true, my problem is that I have no idea how to set the MappingJackson2MessageConverter converter in the consumer side (please note that I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter and not org.springframework.amqp.support.converter.JsonMessageConverter)

有什么想法吗?

以防万一,您可以将此示例项目放在以下位置: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

Just in case, you can fork this sample project at: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

推荐答案

好,我终于可以正常工作了.

Ok, I finally got this working.

Spring使用 PayloadArgumentResolver 提取,转换并将转换后的消息设置为以 @RabbitListener 注释的方法参数.我们需要以某种方式将 mappingJackson2MessageConverter 设置到该对象中.

Spring uses a PayloadArgumentResolver to extract, convert and set the converted message to the method parameter annotated with @RabbitListener. Somehow we need to set the mappingJackson2MessageConverter into this object.

因此,在CONSUMER应用程序中,我们需要实现 RabbitListenerConfigurer .通过覆盖 configureRabbitListeners(RabbitListenerEndpointRegistrar registrar),我们可以设置自定义 DefaultMessageHandlerMethodFactory ,为此工厂设置消息转换器,然后工厂将创建我们的 PayloadArgumentResolver >具有正确的转换.

So, in the CONSUMER app, we need to implement RabbitListenerConfigurer. By overriding configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) we can set a custom DefaultMessageHandlerMethodFactory, to this factory we set the message converter, and the factory will create our PayloadArgumentResolver with the the correct convert.

这是代码段,我还更新了 git项目

Here is a snippet of the code, I've also updated the git project.

ConsumerApplication.java

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements RabbitListenerConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Autowired
    private Receiver receiver;

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

因此,如果您运行Producer微服务,它将在队列中添加2条消息.一个代表Foo对象,另一个代表Bar对象. 通过运行使用者微服务,您将看到两者都被 Receiver 类中的相应方法使用.

So, if you run the Producer microservice it will add 2 messages in the queue. One that represent a Foo object and another that represent a Bar object. By running the consumer microservice you will see that both are consumed by the respective method in the Receiver class.

更新的问题:

我认为从我这边排队有一个概念上的问题.通过声明两个用 @RabbitListener 注释的方法指向同一队列,我想实现的目标是不可能的.上面的解决方案无法正常工作.如果您发送给rabbitmq,比如说6条Foo消息和3条Bar消息,则带有Foo参数的侦听器将不会收到6次.似乎侦听器是并行调用的,因此无法根据方法参数类型来区分要调用的侦听器. 我的解决方案(而且我不确定这是否是最好的方法,在这里我可以提出建议)是为每个实体创建一个队列. 所以现在,我有了 queue.bar queue.foo ,并更新了 @RabbitListener(queues ="queue.foo") 再次,我更新了代码,您可以在我的 git存储库中进行检查.

There is a conceptual problem about queuing from my side I think. What I wanted to achieved can not be possible by declaring 2 methods annotated with @RabbitListener that points to the same queue. The solution above was not working properly. If you send to rabbitmq, let say, 6 Foo messages and 3 Bar messages, they wont be received 6 times by the listener with Foo parameter. It seems that the listener are invoked in parallel so there is no way to discriminate which listener to invoke based on the method argument type. My solution (and I'm not sure if this is the best way, I'm open to suggestions here) is to create a queue for each entity. So now, I have queue.bar and queue.foo, and update @RabbitListener(queues = "queue.foo") Once again, I've updated the code and you can check it out in my git repository.

这篇关于Spring Boot Rabbitmq MappingJackson2MessageConverter自定义对象转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆