正在使sqs-consumer在sqs可伸缩中检测receiveMessage事件 [英] is putting sqs-consumer to detect receiveMessage event in sqs scalable

查看:127
本文介绍了正在使sqs-consumer在sqs可伸缩中检测receiveMessage事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用aws sqs作为消息队列. sqs.sendMessage发送数据后,我想通过无限循环或事件触发以可伸缩方式检测sqs.receiveMessage.然后我遇到了 sqs-消费者 在收到消息后立即处理sqs.receiveMessage事件.但是我想知道,这是处理微服务之间消息传递的最合适方法,还是还有其他更好的方法来处理这种事情?

I am using aws sqs as message queue. After sqs.sendMessage sends the data , I want to detect sqs.receiveMessage via either infinite loop or event triggering in scalable way. Then I came accross sqs-consumer to handle sqs.receiveMessage events, the moment it receives the messages. But I was wondering , is it the most suitable way to handle message passing between microservices or is there any other better way to handle this thing?

推荐答案

我已经用Java编写了用于使用SQSBufferedAsyncClient从sqs队列中获取数据的代码,使用此API的好处是可以在异步模式下缓冲消息.

I had written the code in java for fetching the data from sqs queue with SQSBufferedAsyncClient, advantages using this API is buffered the messages in async mode.

/**
 * 
 */
package com.sxm.aota.tsc.config;

import java.net.UnknownHostException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.retry.RetryPolicy.BackoffStrategy;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.buffered.QueueBufferConfig;

@Configuration
public class SQSConfiguration {

    /** The properties cache config. */
    @Autowired
    private PropertiesCacheConfig propertiesCacheConfig;

    @Bean
    public AmazonSQSAsync amazonSQSClient() {
        // Create Client Configuration
        ClientConfiguration clientConfig = new ClientConfiguration()
            .withMaxErrorRetry(5)
            .withConnectionTTL(10_000L)
            .withTcpKeepAlive(true)
            .withRetryPolicy(new RetryPolicy(
                    null, 
                new BackoffStrategy() {                 
                    @Override
                    public long delayBeforeNextRetry(AmazonWebServiceRequest req, 
                            AmazonClientException exception, int retries) {
                        // Delay between retries is 10s unless it is UnknownHostException 
                        // for which retry is 60s
                        return exception.getCause() instanceof UnknownHostException ? 60_000L : 10_000L;
                    }
                }, 10, true));
        // Create Amazon client
        AmazonSQSAsync asyncSqsClient = null;
        if (propertiesCacheConfig.isIamRole()) {
            asyncSqsClient = new AmazonSQSAsyncClient(new InstanceProfileCredentialsProvider(true), clientConfig);
        } else {
            asyncSqsClient = new AmazonSQSAsyncClient(
                    new BasicAWSCredentials("sceretkey", "accesskey"));
        }
        final Regions regions = Regions.fromName(propertiesCacheConfig.getRegionName());
        asyncSqsClient.setRegion(Region.getRegion(regions));
        asyncSqsClient.setEndpoint(propertiesCacheConfig.getEndPoint());

        // Buffer for request batching
        final QueueBufferConfig bufferConfig = new QueueBufferConfig();
        // Ensure visibility timeout is maintained
        bufferConfig.setVisibilityTimeoutSeconds(20);
        // Enable long polling
        bufferConfig.setLongPoll(true);
        // Set batch parameters
//      bufferConfig.setMaxBatchOpenMs(500);
        // Set to receive messages only on demand
//      bufferConfig.setMaxDoneReceiveBatches(0);
//      bufferConfig.setMaxInflightReceiveBatches(0);

        return new AmazonSQSBufferedAsyncClient(asyncSqsClient, bufferConfig);
    }

}

然后编写了scheduleR,它每2秒执行一次并从队列中获取数据,进行处理,然后在可见性超时之前将其从队列中删除,否则当可见性tiiimeout再次到期时,它将准备再次处理.

then written the scheduleR which executes after every 2 secs and fetches the data from queue, process it and delete it from queue before visibility timeout otherwise it will be ready for processing again when visibility tiiimeout expires again.

package com.sxm.aota.tsc.sqs;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.databind.ObjectMapper;


    /**
     * The Class TSCDataSenderScheduledTask.
     * 
     * Sends the aggregated Vehicle data to TSC in batches
     */
    @EnableScheduling
    @Component("sqsScheduledTask")
    @DependsOn({ "propertiesCacheConfig", "amazonSQSClient" })
    public class SQSScheduledTask {

        private static final Logger LOGGER = LoggerFactory.getLogger(SQSScheduledTask.class);
        @Autowired
        private PropertiesCacheConfig propertiesCacheConfig;
        @Autowired
        public AmazonSQSAsync amazonSQSClient;

        /**
         * Timer Task that will run after specific interval of time Majorly
         * responsible for sending the data in batches to TSC.
         */
        private String queueUrl;
        private final ObjectMapper mapper = new ObjectMapper();

        @PostConstruct
        public void initialize() throws Exception {
            LOGGER.info("SQS-Publisher", "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName());
            // Get queue URL
            final GetQueueUrlRequest request = new GetQueueUrlRequest().withQueueName(propertiesCacheConfig.getSQSQueueName());
            final GetQueueUrlResult response = amazonSQSClient.getQueueUrl(request);
            queueUrl = response.getQueueUrl();

            LOGGER.info("SQS-Publisher", "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName() + ", URL = " + queueUrl);
        }

        @Scheduled(fixedDelayString = "${sqs.consumer.delay}")
        public void timerTask() {

            final ReceiveMessageResult receiveResult = getMessagesFromSQS();
            String messageBody = null;
            if (receiveResult != null && receiveResult.getMessages() != null && !receiveResult.getMessages().isEmpty()) {
                try {
                    messageBody = receiveResult.getMessages().get(0).getBody();
                    String messageReceiptHandle = receiveResult.getMessages().get(0).getReceiptHandle();
                    Vehicles vehicles = mapper.readValue(messageBody, Vehicles.class);
                    processMessage(vehicles.getVehicles(),messageReceiptHandle);
                } catch (Exception e) {
                    LOGGER.error("Exception while processing SQS message : {}", messageBody);
                    // Message is not deleted on SQS and will be processed again after visibility timeout
                }
            }
        }

        public void processMessage(List<Vehicle> vehicles,String messageReceiptHandle) throws InterruptedException {
            //processing code
            //delete the sqs message as the processing is completed
            //Need to create atomic counter that will be increamented by all TS.. Once it will be 0 then we will be deleting the messages

                    amazonSQSClient.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));

        }

        private ReceiveMessageResult getMessagesFromSQS() {
            try {
                // Create new request and fetch data from Amazon SQS queue
                final ReceiveMessageResult receiveResult = amazonSQSClient
                        .receiveMessage(new ReceiveMessageRequest().withMaxNumberOfMessages(1).withQueueUrl(queueUrl));
                return receiveResult;
            } catch (Exception e) {
                LOGGER.error("Error while fetching data from SQS", e);
            }
            return null;
        }

    }

这篇关于正在使sqs-consumer在sqs可伸缩中检测receiveMessage事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆