如何避免在出现意外错误时关闭 SimpleMessageListenerContainer? [英] How to avoid shutdown of SimpleMessageListenerContainer in case of unexpected errors?

查看:54
本文介绍了如何避免在出现意外错误时关闭 SimpleMessageListenerContainer?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Java 引导 1.4.0 和spring-boot-starter-amqp"来连接到 rabbitMq.消息生产者、消费者和rabbitMq 服务器都在我的控制之下.几个月的生产中一切正常.但是突然我的消费者停止了下面给出的例外.由于我只生成始终有效的消息,因此我不知道出了什么问题.

I am using Java boot 1.4.0 and 'spring-boot-starter-amqp' for connecting to rabbitMq. The message producer, consumer and the rabbitMq server are under my control.Things were working fine for a few months in production. But suddenly my consumer stopped with exceptions given below. As I only produce messages which are always valid, I had no clue what went wrong.

但这导致我的侦听器容器关闭.因此我的消息处理停止了.我不得不手动重新启动我的消息消费者程序.

But this caused my listener-container to shutdown. And hence my message processing stopped.I had to manually restart my message-consumer program.

所以我的问题是:

  1. 我们能否避免在任何意外情况下完全关闭侦听器容器?
  2. 是否可以优雅地丢弃此类消息并让侦听器容器保持活动状态?
  3. 如果没有,那么有什么方法可以检查我的所有侦听器容器是否都在运行,并在发现它们死了时启动它们?(注意:我查看了 RabbitListenerEndpointRegistry.getListenerContainers().但看起来它不包括 SimpleMessageListenerContainer 容器.)

异常日志:

2017-02-20 12:42:18.441 ERROR 18014 --- [writeToDBQQueueListenerContainer-17] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.
java.lang.NoClassDefFoundError: org/springframework/messaging/handler/annotation/support/MethodArgumentNotValidException
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.causeIsFatal(ConditionalRejectingErrorHandler.java:110) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isFatal(ConditionalRejectingErrorHandler.java:97) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:72) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:625) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:852) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:685) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
    at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
    ... 11 common frames omitted

还有一个例外:

2017-02-20 12:42:18.674 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.

java.lang.NoClassDefFoundError: com/rabbitmq/utility/Utility
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.checkShutdown(BlockingQueueConsumer.java:348) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:402) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1160) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.utility.Utility
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
        at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
        ... 7 common frames omitted

2017-02-20 12:42:18.675 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer

我的消费者示例代码:

@Bean
public MessageConverter jsonMessageConverter(){
    //return new JsonMessageConverter();
    Jackson2JsonMessageConverter converter =  new   Jackson2JsonMessageConverter();

    converter.setClassMapper(new ClassMapper() {
        @Override
        public Class<?> toClass(MessageProperties properties) {
            return String.class;
        }

        @Override
        public void fromClass(Class<?> clazz, MessageProperties properties) {
        }
    });

    return converter;
}

@Bean
public ConnectionFactory connectionFactory() 
{
    CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory(_rabbitmqHost, _rabbitmqPort);
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() 
{
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    return rabbitTemplate;
}

@Bean
TopicExchange exchange() 
{
    return new TopicExchange("MyExchange");
}

@Bean
public Queue mainQueue() 
{
    return new Queue("MyMainQ");
}

@Bean
public Binding mainRouteBinding() 
{
    return BindingBuilder.bind(mainQueue()).to(exchange()).with("MyMainQ");
}

@Bean
SimpleMessageListenerContainer mainQueueListenerContainer(
        ConnectionFactory connectionFactory, 
        @Qualifier("mainQueueListenerAdapter") MessageListenerAdapter listenerAdapter) 
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(mainQueue());

    container.setMessageConverter(jsonMessageConverter());

    container.setMessageListener(listenerAdapter);
    container.setConcurrentConsumers(1);
    return container;
}

@Bean
MessageListenerAdapter mainQueueListenerAdapter(MainConsumer receiver) 
{
    MessageListenerAdapter msgAdapter = new MessageListenerAdapter(receiver, "receiveMessage");

    msgAdapter.setMessageConverter(jsonMessageConverter());

    return msgAdapter;
}

@Bean
MainConsumer getMainConsumer()
{
    return new MainConsumer();
}

//
//The receiving method in MainConsumer class looks as given below
public void receiveMessage(String message) 
{
     // My business logic goes here ...
}

推荐答案

几个月前我遇到了同样的问题,这对我有帮助.如果您的 Rabbit Java Client 版本在 4.0.0 之前,您没有自动恢复连接,您需要设置,如下所示:

I had the same problems a couple of months ago and this helped me. If the version of your Rabbit Java Client is prior to 4.0.0 you don't have the recovery connection automatically, you need to set, like here:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

// connection that will recover automatically
factory.setAutomaticRecoveryEnabled(true);

// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

Connection conn = factory.newConnection();

尝试查看 RabbitMQ 的文档:Rabbit 客户端 API

Try to check the documentation for RabbitMQ: Rabbit Client API

这篇关于如何避免在出现意外错误时关闭 SimpleMessageListenerContainer?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆