org.springframework.core.task.TaskRejectedException 同时侦听 SQS 队列? [英] org.springframework.core.task.TaskRejectedException while listening for SQS queue?

查看:587
本文介绍了org.springframework.core.task.TaskRejectedException 同时侦听 SQS 队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 @SqsListener 创建了一个使用来自 SQS 的消息的基本工作流.它工作正常,但我不断收到大量类似的消息:

<块引用>

org.springframework.core.task.TaskRejectedException: Executor[java.util.concurrent.ThreadPoolExecutor@372b568[正在运行,池大小=3、活动线程=3,排队任务=0,完成任务=0]]没有接受任务:org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9在org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:317)~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE] 在org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286)~[spring-cloud-aws-messaging-2.1.0.RELEASE.jar:2.1.0.RELEASE] atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[na:1.8.0_171] 在java.util.concurrent.FutureTask.run(FutureTask.java:266)[na:1.8.0_171] 在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[na:1.8.0_171] 在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[na:1.8.0_171] 在 java.lang.Thread.run(Thread.java:748)[na:1.8.0_171] 原因:java.util.concurrent.RejectedExecutionException:任务org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9从 java.util.concurrent.ThreadPoolExecutor@372b568[Running,池大小 = 3,活动线程 = 3,排队任务 = 0,已完成任务 =0] 在java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)~[na:1.8.0_171] 在java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)[na:1.8.0_171] 在java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)[na:1.8.0_171] 在org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:314)~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE] ... 6个常用框架省略

我的配置bean:

@EnableSqs@配置公共类 AmazonSqsConfiguration {@Value("${aws.sqs.accessKey}")私人字符串访问密钥;@Value("${aws.sqs.secretKey}")私人字符串secretKey;@Value("${aws.sqs.region}")私有字符串区域;@Value("${aws.sqs.url}")私人字符串网址;@豆角,扁豆公共 AmazonSQSAsync amazonSqs() {AWSCredentials 凭证 = new BasicAWSCredentials(accessKey, secretKey);AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);返回 AmazonSQSAsyncClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(url, region)).withCredentials(credentialsProvider).建造();}}

我的消费者如下:

@SqsListener(value = "my-queue",deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)public void processSubscription(String xmlNotification) {/* 消息处理器 */}

是否可以通过重新配置 @Bean 来删除它们?问题的根本原因是什么以及如何与之抗争?

我试图通过自然搜索找到解决方案,但遇到了以下答案.它对我不起作用,因为我没有 JMS.我无法调试,因为我什至不知道要调试什么.

解决方案

我找到了 spring-cloud-aws 与我遇到的行为有关.我也找到了相关的 StackOverflow 问题.>

因此,对我有用的解决方案如下:

@Bean公共 SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS) {SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();factory.setAmazonSqs(amazonSQS);factory.setMaxNumberOfMessages(10);factory.setAutoStartup(true);factory.setWaitTimeOut(20);返厂;}

I create a basic workflow of consuming messages from SQS with @SqsListener. It works fine but I constantly get tons of similar messages:

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@372b568[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:317) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.1.0.RELEASE.jar:2.1.0.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171] Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 rejected from java.util.concurrent.ThreadPoolExecutor@372b568[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) [na:1.8.0_171] at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:314) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE] ... 6 common frames omitted

My configuration bean:

@EnableSqs
@Configuration
public class AmazonSqsConfiguration {

    @Value("${aws.sqs.accessKey}")
    private String accessKey;

    @Value("${aws.sqs.secretKey}")
    private String secretKey;

    @Value("${aws.sqs.region}")
    private String region;

    @Value("${aws.sqs.url}")
    private String url;

    @Bean
    public AmazonSQSAsync amazonSqs() {
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
        return AmazonSQSAsyncClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(url, region))
                .withCredentials(credentialsProvider)
                .build();
    }

}

My consumer is the following:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void processSubscription(String xmlNotification) {/* Message processor */}

Is it possible to remove them by re configuring @Bean? What is the root cause of the issue and how to fight with it?

I tried to find solution by natural search and encountered with the following answer. It doesn't work for me, since I don't have JMS. I couldn't debug because I don't even know what to debug.

解决方案

I found ticket for spring-cloud-aws related to the behavior I encountered. I also, find relevant StackOverflow question.

Therefore, solution that worked for me was the following:

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    factory.setAutoStartup(true);
    factory.setWaitTimeOut(20);

    return factory;
}

这篇关于org.springframework.core.task.TaskRejectedException 同时侦听 SQS 队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆