设置手动确认SQS消息的Spring Cloud AWS问题 [英] Spring Cloud AWS Issue with setting manual acknowledge of SQS message

查看:248
本文介绍了设置手动确认SQS消息的Spring Cloud AWS问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过使用spring-cloud-aws-messaging手动删除AWS SQS消息来实现逻辑.此功能是在此票证

I'm trying to implement logic with manual deleting of AWS SQS message using spring-cloud-aws-messaging. This feature was implemented in scope of this ticket from the example in tests

@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {

    LOGGER.info("Received message {}", message.getFoo());
    try {
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        LOGGER.error("Opps", e);
    } catch (ExecutionException e) {
        LOGGER.error("Opps", e);
    }
}

但遇到意外的异常

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of org.springframework.cloud.aws.messaging.listener.Acknowledgment (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

使用SqsMessageDeletionPolicy.ON_SUCCESS的解决方案有效,但我想避免引发异常.

Solution with SqsMessageDeletionPolicy.ON_SUCCESS works but I want to avoid throwing an exception.

我在配置中错过了什么?

What have I missed in the configuration?

推荐答案

花了一些力气,尝试了与其他SO答案不同的事情.

It took some fiddling around and trying different things from other SO answers.

这是我的代码,我将尽力解释.我包括了我要用于SQS消费者的所有内容.

Here is my code and I'll try to explain as best I can. I'm including everything that I'm using for my SQS consumer.

我的配置类如下.下面仅需注意的一点不是在queueMessageHandlerFactory方法中实例化的转换器和解析器对象. MappingJackson2MessageConverter(如果从如此明显的类名中看不出来的话)类处理来自SQS的有效负载的反序列化.

My config class is below. Only not-so-obvious thing to note below is the converter and resolver objects instantiated in the queueMessageHandlerFactory method. The MappingJackson2MessageConverter (in case it isn't obvious from the oh-so-obvious class name) class handles the deserialization of the payload from SQS.

将严格的内容类型匹配设置为false也很重要.

It's also important that the strict content type match be set to false.

此外,MappingJackson2MessageConverter允许您设置自己的Jackson ObjectMapper,但是如果这样做,则需要按以下方式进行配置:

Also, the MappingJackson2MessageConverter allows you to set your own Jackson ObjectMapper, however if you do that you will need to configure it as follows:

objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

您可能不想这样做,因此可以将其保留为空,它将创建自己的ObjectMapper.

You may not want to do that, so you can leave it null and it will create its own ObjectMapper.

我认为代码的其余部分很容易解释……?让我知道是否可以.

I think the rest of the code is pretty self-explanatory...? Let me know if not.

我们的用例之间的一个区别是,看起来您正在映射自己的自定义对象(SqsEventDTO),并且我认为这是可行的吗?在那种情况下,我认为您不需要MappingJackson2MessageConverter,但是我可能是错的.

One difference between our use-cases, it looks like you're mapping your own custom object (SqsEventDTO) and I assume that's working? In that case, I don't think you will need the MappingJackson2MessageConverter, but I could be wrong.

@Configuration
public class AppConfig {

@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    return queueMessageHandlerFactory.createQueueMessageHandler();
}

@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(sqsClient);

    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setSerializedPayloadClass(String.class);

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);

    // Uses the MappingJackson2MessageConverter object to resolve/map 
    // the payload against the Message/S3EventNotification argument.
    PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);

    // Extract the acknowledgment data from the payload's headers, 
    // which then gets deserialized into the Acknowledgment object.  
    AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");

    // I don't remember the specifics of WHY, however there is 
    // something important about the order of the argument resolvers 
    // in the list
    factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));

    return factory;
}

@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
    @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {

    SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
    smlc.setWaitTimeOut(20);
    smlc.setAmazonSqs(amazonSQSAsync);
    smlc.setMessageHandler(queueMessageHandler);
    smlc.setBeanName("ConsumerBean");
    smlc.setMaxNumberOfMessages(sqsMaxMessages);
    smlc.setTaskExecutor(threadPoolExecutor);

    return smlc;
}

@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(corePoolSize);
    executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setKeepAliveSeconds(threadTimeoutSeconds);
    executor.setThreadNamePrefix(threadName);
    executor.initialize();

    return executor;
}
}

我的SQS消费者服务类别如下.

My SQS consumer Service class is below.

@Service
public class RawConsumer {

@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
    // Handle event here
}

希望对您有帮助.

这篇关于设置手动确认SQS消息的Spring Cloud AWS问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆