适用于 AWS SQS 的 Spring 启动轮询器 [英] Spring boot Poller for AWS SQS
本文介绍了适用于 AWS SQS 的 Spring 启动轮询器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在固定时间间隔后连续调用方法的最佳方法是什么?
What is the best way to call a method continuously after a fixed interval?
我想设计一个轮询器,它可以在定义的时间间隔后自动从 AWS SQS 拉取消息.
I want to design a Poller that can pull messages from AWS SQS automatically after a defined time interval.
非常感谢任何好的建议.
Any good suggestions are much appreciated.
推荐答案
可以使用AWS提供的SDK进行消息轮询
You can use SDK provided by AWS to do polling of messages
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
public class SQSRealtimePoller implements Runnable {
public static final int MAX_MESSAGES = 10;
public static final int DEFAULT_VISIBILITY_TIMEOUT = 15;
//Value greater that 0 makes it long polling, which will reduce SQS cost
public static final int WAIT_TIME = 20;
public static final int PROCESSORS = 2;
ExecutorService executor = Executors.newFixedThreadPool(1);
private String queueUrl;
private AmazonSQS amazonSqs;
ArrayBlockingQueue<Message> messageHoldingQueue = new ArrayBlockingQueue<Message>(
1);
public SQSRealtimePoller(String topic, String queueUrl
) {
this.queueUrl = queueUrl;
this.amazonSqs = getSQSClient();
messageHoldingQueue = new ArrayBlockingQueue<Message>(PROCESSORS);
//process more than 1 messages at a time.
executor = Executors.newFixedThreadPool(PROCESSORS);
}
@Override
public void run() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withMaxNumberOfMessages(MAX_MESSAGES)
.withVisibilityTimeout(DEFAULT_VISIBILITY_TIMEOUT)
.withWaitTimeSeconds(WAIT_TIME);
while(true){
try {
List<Message> messages = amazonSqs
.receiveMessage(receiveMessageRequest).getMessages();
if (messages == null || messages.size() == 0) {
// If there were no messages during this poll period, SQS
// will return this list as null. Continue polling.
continue;
} else {
for (Message message : messages) {
try {
//will wait here till the queue has free space to add new messages. Read documentation
messageHoldingQueue.put(message);
} catch (InterruptedException e) {
}
Runnable run = new Runnable() {
@Override
public void run() {
try {
Message messageToProcess = messageHoldingQueue
.poll();
//Process your message here
System.out.println(messageToProcess);
//Delete the messages from queue
amazonSqs.deleteMessage(queueUrl,
messageToProcess
.getReceiptHandle());
} catch (Exception e) {
e.printStackTrace();
}
}
};
executor.execute(run);
}}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Make this singleton
public static AmazonSQS getSQSClient(){
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location , and is in valid format.",
e);
}
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(Regions.US_WEST_2)
.build();
return sqs;
}}
这篇关于适用于 AWS SQS 的 Spring 启动轮询器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文