ThreadPoolExecutor :: 来自 Executor 的 TaskRejectedException [英] ThreadPoolExecutor : : TaskRejectedException from Executor

查看:27
本文介绍了ThreadPoolExecutor :: 来自 Executor 的 TaskRejectedException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的应用程序正在通过 Jms MessageListener 类读取消息,并且在某个时间点抛出 TaskRejectedException.我知道你们大多数人会说maxPoolSize超过了线程数,队列也满了.

My application is reading messages through Jms MessageListener class and at some point of time it is throwing TaskRejectedException. I know most of you will say that the number of threads is exceeded by maxPoolSize and queue is also full.

但是我观察到了一些东西.发送到 MessageListener 类从中获取消息的队列的消息数为 10353,我的 threadPoolExecutor 的 spring 属性如下:

But I observed something. The number of messages sent to the queue from which the MessageListener class is fetching messages is 10353 and my spring property for threadPoolExecutor is below :

<bean id="ticketReaderThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy">
    <property name="corePoolSize" value="10" />
    <property name="maxPoolSize" value="150" />
    <property name="queueCapacity" value="11000" />
</bean>

现在据我所知,maxPoolSize 足以处理这么多请求.因此,如果你们中的任何人都可以给出 maxPoolSize 违规之外的原因,那么请这样做.

Now according to me, the maxPoolSize is more than enough to handle these many requests. So if anyone of you can give a reason apart from maxPoolSize breach then please do so.

我们现在第二次遇到这个问题,之前我们已经尝试增加 maxPoolSize 但 15 天后我们再次遇到这个异常,每天大约 5000 到 8000 次.

We are facing this issue for the second time now, previously we already tried increasing the maxPoolSize but again after 15 days we are experiencing this exception for around 5000 to 8000 times a day.

更新:

这是异常的完整堆栈跟踪:

This is the full stack trace of the exception:

从队列读取/处理消息时发生一般异常org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@408b9775] 没有接受任务:com.batman.rapid.rapidserver.sla.TicketHandler@1be5e598在 org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:244)在 com.batman.rapid.rapidserver.sla.JmsTicketReceiver.onMessage(JmsTicketReceiver.java:58)在 org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)在 org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)在 org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)在 org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)在 org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)在 java.lang.Thread.run(Thread.java:662)引起:java.util.concurrent.RejectedExecutionException在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)在 org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:241)……还有 10 个

General Exception occurred while reading from Queue/Processing the message org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@408b9775] did not accept task: com.batman.rapid.rapidserver.sla.TicketHandler@1be5e598 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244) at com.batman.rapid.rapidserver.sla.JmsTicketReceiver.onMessage(JmsTicketReceiver.java:58) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947) at java.lang.Thread.run(Thread.java:662) Caused by: java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:241) ... 10 more

这是相关代码:

if (message instanceof TextMessage)
{
    textMessage = (TextMessage) message;
    ticketReaderThreadPool.execute(new TicketHandler(textMessage.getText()));
}

以下是请求的配置:

    <!-- End of JMS Queue Support -->

    <bean id="ticketReaderThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy">
    <property name="corePoolSize" value="10" />
    <property name="maxPoolSize" value="150" />
    <property name="queueCapacity" value="11000" />
</bean>

    <bean id="notificationThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="100" />
            <property name="queueCapacity" value="10000" />
    </bean>

    <bean id="notificationManager" class="com.batman.rapid.rapidserver.sla.scheduler.NotificationManager" scope="singleton">
            <property name="defaultPercent" value="80"></property>
    </bean>

    <bean id="dbUpdateThreads" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy">
            <property name="corePoolSize" value="1" />
            <property name="maxPoolSize" value="100" />
            <property name="queueCapacity" value="10000" />
    </bean>

推荐答案

不确定具体原因是什么.但是我强烈认为,最大消息数可能会超出您设置的 maxqueueCapacity.但是,您使用的不同方法与 JMS 的一般用法不同.通常在 DefaultMessageListener 中,我们配置 Max-consumers 的数量以并行处理异步处理.根据服务器资源,我们配置服务器可以处理的最大消费者

Not sure what is the exact cause. However I strongly feel that, the max message count might be going beyond the maxqueueCapacity you had set. However the vary approach you are using deviates from general usage of JMS. Generally in DefaultMessageListener we configure the number of Max-consumers to process the in parallel for async processing. based on the server resources we configure the max consumer that server can handle

但是,在您从 DefaultMessageListener 读取消息的情况下,您会生成新线程并在新线程中执行您的业务逻辑.由于消息侦听器读取速度比业务逻辑快,因此任务会在线程任务队列中累积.

However in your case from DefaultMessageListener the message is read and you spawn new thread and execute your business logic in new thread. since message listener read is faster than business logic, task gets accumulated in the thread task queue.

我建议重新处理您当前的实施.因为它不支持交易.我的意思是当服务器崩溃时,任务中的所有待处理消息处理都将终止/丢失,并且由于您将设置自动确认,消息将已从队列中删除.其他问题如手动设置线程队列限制等

I would suggest to re-approach your current implementation. as it doesn't support transactions. I mean when server crashes all pending message processing in tasks would be terminated/lost and since you would set auto-ack, the message would have been removed from queue. Other issue like to set manually limit of thread queue etc.

最后回到您的问题,您可以快速检查您设置的 150 个线程中是否有任何锁定

Finally coming back to your issue, you can do a quick check if there are any locks in the 150 threads you had set

这篇关于ThreadPoolExecutor :: 来自 Executor 的 TaskRejectedException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆