适用于 AWS SQS 的 Spring 启动轮询器 [英] Spring boot Poller for AWS SQS

查看:37
本文介绍了适用于 AWS SQS 的 Spring 启动轮询器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在固定时间间隔后连续调用方法的最佳方法是什么?

What is the best way to call a method continuously after a fixed interval?

我想设计一个轮询器,它可以在定义的时间间隔后自动从 AWS SQS 拉取消息.

I want to design a Poller that can pull messages from AWS SQS automatically after a defined time interval.

非常感谢任何好的建议.

Any good suggestions are much appreciated.

推荐答案

可以使用AWS提供的SDK进行消息轮询

You can use SDK provided by AWS to do polling of messages

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;

public class SQSRealtimePoller implements Runnable {


public static final int MAX_MESSAGES = 10;

public static final int DEFAULT_VISIBILITY_TIMEOUT = 15;

//Value greater that 0 makes it long polling, which will reduce SQS cost
public static final int WAIT_TIME = 20;
public static final int PROCESSORS = 2;

ExecutorService executor = Executors.newFixedThreadPool(1);
private String queueUrl;

private AmazonSQS amazonSqs;

ArrayBlockingQueue<Message> messageHoldingQueue = new ArrayBlockingQueue<Message>(
        1);

public SQSRealtimePoller(String topic, String queueUrl
       ) {
    this.queueUrl = queueUrl;
    this.amazonSqs = getSQSClient();
    messageHoldingQueue = new ArrayBlockingQueue<Message>(PROCESSORS);
    //process more than 1 messages at a time. 
    executor = Executors.newFixedThreadPool(PROCESSORS);

}

@Override
public void run() {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
            .withQueueUrl(queueUrl)
            .withMaxNumberOfMessages(MAX_MESSAGES)
            .withVisibilityTimeout(DEFAULT_VISIBILITY_TIMEOUT)
            .withWaitTimeSeconds(WAIT_TIME);
    while(true){
        try {
            List<Message> messages = amazonSqs
                    .receiveMessage(receiveMessageRequest).getMessages();
            if (messages == null || messages.size() == 0) {
                // If there were no messages during this poll period, SQS
                // will return this list as null. Continue polling.
                continue;
            } else {
                for (Message message : messages) {
                    try {
                        //will wait here till the queue has free space to add new messages. Read documentation
                        messageHoldingQueue.put(message);
                    } catch (InterruptedException e) {

                    }
                    Runnable run = new Runnable() {
                        @Override
                        public void run() {

                            try {
                                Message messageToProcess = messageHoldingQueue
                                        .poll();
                               //Process your message here 
                                
                                System.out.println(messageToProcess);
                                
                                //Delete the messages from queue
                                    amazonSqs.deleteMessage(queueUrl,
                                            messageToProcess
                                                    .getReceiptHandle());
                                
                            } catch (Exception e) {
                                       e.printStackTrace();
                            }
                        }
                    };
                    executor.execute(run);
    }}
            }       catch (Exception e) {
                e.printStackTrace();
                }
    
}
}

//Make this singleton
public static AmazonSQS getSQSClient(){
    ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
    try {
        credentialsProvider.getCredentials();
    } catch (Exception e) {
        throw new AmazonClientException(
                "Cannot load the credentials from the credential profiles file. " +
                "Please make sure that your credentials file is at the correct " +
                "location , and is in valid format.",
                e);
    }

    AmazonSQS sqs = AmazonSQSClientBuilder.standard()
                           .withCredentials(credentialsProvider)
                           .withRegion(Regions.US_WEST_2)
                           .build();
    return sqs;
}}

这篇关于适用于 AWS SQS 的 Spring 启动轮询器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆