@JmsListener 突然停止消费消息,没有任何错误或异常 [英] @JmsListener suddenly stopped consuming messages without any errors or exceptions

查看:36
本文介绍了@JmsListener 突然停止消费消息,没有任何错误或异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个在生产中运行的应用程序,它使用 ActiveMQ 上的虚拟主题,例如

I have an application running on production which consumes a virtual topic on ActiveMQ like

@JmsListener(destination = "Consumer.example-consumer.VirtualTopic.${spring.activemq.example.topic}")
public void consumeTopic(String message) {...}

在 application.properties 中的位置

Where in application.properties

spring.activemq.example.topic=example.topic

我也在 Spring Boot 中配置了我的 Active MQ

Also I have configured my Active MQ in Spring Boot like

@Configuration
@EnableJms
public class AmazonMQConfiguration {
    @Value("${spring.activemq.broker-url}")
    public String brokerURL;
    
    @Value("${spring.activemq.user}")
    public String userName;
    
    @Value("${spring.activemq.password}")
    public String password;
    
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerURL);
        factory.setUserName(userName);
        factory.setPassword(password);
        return factory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(){
        return new JmsTemplate(activeMQConnectionFactory());
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setConcurrency("1-1");
        return factory;
    }
}

但昨天突然间,我的 @JmsListener 停止使用来自虚拟主题的消息,并且在 24 小时内我的消费者队列中有大约 82000 条待处理的消息.

But suddenly yesterday my @JmsListener stopped consuming the messages from the virtual topic and there were around 82000 pending messages in my consumer queue in 24 hours.

请帮助我解决这个问题,因为我不知道为什么会出现这个问题并且无法重现它.

Kindly help me with the issue as I am clueless why this issue occurred and unable to recreate it.

在下面,我提供了我的依赖项 pom.xml 代码和我在了解该问题时获取的示例线程转储.

Further below I am providing my dependencies pom.xml code and the sample thread dump which I took when I got to know about the issue.

pom.xml:-

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jersey</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web-services</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    
    <!-- Exclude Spring Boot's Default Logging -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Add Log4j2 Dependency -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

线程转储:-

    2020-12-16 15:02:17
Full thread dump OpenJDK 64-Bit Server VM (25.265-b01 mixed mode):

"Attach Listener" #723 daemon prio=9 os_prio=0 tid=0x00007f77a8007800 nid=0x33b6 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ActiveMQ InactivityMonitor Worker" #55 daemon prio=5 os_prio=0 tid=0x00007f77a00bc000 nid=0x6571 waiting on condition [0x00007f7782ffd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000855ef0a8> (a java.util.concurrent.SynchronousQueue$TransferStack)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-4" #54 daemon prio=5 os_prio=0 tid=0x00007f77b84fb000 nid=0x655c runnable [0x00007f778c5af000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-3" #53 daemon prio=5 os_prio=0 tid=0x00007f77b84f9800 nid=0x655b runnable [0x00007f778c6b0000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-2" #52 daemon prio=5 os_prio=0 tid=0x00007f77b84f7800 nid=0x655a runnable [0x00007f778c7b1000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-1" #51 daemon prio=5 os_prio=0 tid=0x00007f77b84f6800 nid=0x6559 runnable [0x00007f778c8b2000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"DestroyJavaVM" #50 prio=5 os_prio=0 tid=0x00007f77dc04c800 nid=0x651e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #44 prio=5 os_prio=0 tid=0x00007f77a8009800 nid=0x6554 runnable [0x00007f778d2c6000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
    - locked <0x00000000855731a0> (a java.lang.Object)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    - locked <0x00000000855c6688> (a sun.security.ssl.AppInputStream)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
    at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)

"DefaultMessageListenerContainer-1" #42 prio=5 os_prio=0 tid=0x00007f77dd72c800 nid=0x6552 waiting on condition [0x00007f778d4c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008a8e04d0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
    at reactor.core.publisher.Mono.block(Mono.java:1680)
    at com.main.service.SingularConnectionService.sendDataToSingular(SingularConnectionService.java:38)
    at com.main.consumer.AmazonMQConsumer.consumeTopic(AmazonMQConsumer.java:62)
    at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

"ActiveMQ InactivityMonitor WriteCheckTimer" #40 daemon prio=5 os_prio=0 tid=0x00007f7798038000 nid=0x6550 in Object.wait() [0x00007f778d6ca000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.TimerThread.mainLoop(Timer.java:552)
    - locked <0x000000008556c420> (a java.util.TaskQueue)
    at java.util.TimerThread.run(Timer.java:505)

"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #38 prio=5 os_prio=0 tid=0x00007f7794571800 nid=0x654e runnable [0x00007f778d8cc000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
    - locked <0x00000000855d9288> (a java.lang.Object)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    - locked <0x00000000855da5e8> (a sun.security.ssl.AppInputStream)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
    at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)

"ActiveMQ InactivityMonitor ReadCheckTimer" #37 daemon prio=5 os_prio=0 tid=0x00007f7794210800 nid=0x654d in Object.wait() [0x00007f778d9cd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.TimerThread.mainLoop(Timer.java:552)
    - locked <0x00000000855eebe0> (a java.util.TaskQueue)
    at java.util.TimerThread.run(Timer.java:505)

"DefaultMessageListenerContainer-1" #36 prio=5 os_prio=0 tid=0x00007f77dd743000 nid=0x654c in Object.wait() [0x00007f778dcce000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
    - locked <0x00000000855eac88> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:418)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:303)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

"http-nio-8082-Acceptor" #34 daemon prio=5 os_prio=0 tid=0x00007f77dd827800 nid=0x654a runnable [0x00007f778ded0000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:419)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:247)
    - locked <0x00000000853bcca0> (a java.lang.Object)
    at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:469)
    at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:71)
    at org.apache.tomcat.util.net.Acceptor.run(Acceptor.java:106)
    at java.lang.Thread.run(Thread.java:748)

推荐答案

单个线程转储没有多大帮助,因为它仅提供应用程序正在执行的操作的快照.您真正需要的是一系列线程转储,以便您可以查看线程随着时间的推移在做什么.但是,由于您提供的只是一个单线程转储,我想说您的问题是这个线程:

A single thread dump doesn't help much because it only provides a snapshot of what the application is doing. What you really need is a series of thread dumps so you can see what the threads are doing over time. However, since all you've provided is a single thread dump I'd say your problem is this thread:

"DefaultMessageListenerContainer-1" #42 prio=5 os_prio=0 tid=0x00007f77dd72c800 nid=0x6552 waiting on condition [0x00007f778d4c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008a8e04d0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
    at reactor.core.publisher.Mono.block(Mono.java:1680)
    at com.main.service.SingularConnectionService.sendDataToSingular(SingularConnectionService.java:38)
    at com.main.consumer.AmazonMQConsumer.consumeTopic(AmazonMQConsumer.java:62)
    at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

这是实际消耗来自主题的消息的线程.请注意,它在此方法中被阻止:

This is the thread which is actually consuming the message from the topic. Notice that it is blocked in this method:

com.main.service.SingularConnectionService.sendDataToSingular()

不清楚这个阻塞发生了多久,但是这个阻塞可能永远不会清除并且基本上阻止您的消费者接收更多消息.由于您使用的是 setConcurrency("1-1"),您只有一个消费者,因此如果它被阻止,则不会再消费任何消息.

It's not clear how long this blocking happens, but it's possible that this block never clears up and basically prevents your consumer from receiving any more messages. Since you're using setConcurrency("1-1") you only have one consumer so if it's blocked then no more messages will be consumed.

你应该调查这个方法在做什么,并确保你调用任何带有超时的阻塞操作,这样线程就不会被无限期地阻塞.

You should investigate what this method is doing and ensure you invoke any blocking operations with a timeout so threads can't be blocked indefinitely.

这篇关于@JmsListener 突然停止消费消息,没有任何错误或异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆