处理SQS item Queue的多线程方法 [英] Multithreaded approach to process SQS item Queue

查看:35
本文介绍了处理SQS item Queue的多线程方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs 项目/消息.轮询项目后,我必须在 kubernetes pod 上处理这些项目.项目处理包括从少数 API 调用中获取响应,这可能需要一些时间 &然后将项目保存到 DB &S3.我做了一些 R&D &得出以下结论

In this scenerio, I have to Poll AWS SQS messages from a queue, each async request can fetch upto 10 sqs items/messages. Once I Poll the items, Then I have to process those items on a kubernetes pod. Item processing includes getting response from few API calls, it may take some time & then saving the item to DB & S3. I did some R&D & reach on following conclusion

  1. 为了使用消费者生产者模型,1 个线程将轮询项目 &另一个线程将处理该项目或使用多线程处理项目
  2. 维护一个数据结构,其中包含准备处理的 sqs 轮询项,DS 可能是阻塞集合或并发队列
  3. 使用任务并行库进行线程池和在项目处理中.
  4. 可以使用频道

我的查询

  1. 实现最佳性能或提高 TPS 的最佳方法是什么.
  2. 我可以/应该使用数据流 TPL
  3. 多线程或单线程异步任务

推荐答案

  1. 实现最佳性能或提高 TPS 的最佳方法是什么:

  1. What would be best approach to achieve best performance or increase TPS:

  • 我会考虑根据您的 SQS 队列大小使用 Pod 自动缩放.准备就绪的解决方案:KEDAAWS CW 指标;

我可以/应该使用数据流 TPL:

Can/Should I use data flow TPL:

  • 由于您的大部分操作都是基于 I/O 的,因此我不会走那条路.你身上会有很多被阻塞的线程.这里有类似的讨论;

具有异步任务的多线程或单线程:

Multi threaded or single threaded with async tasks:

  • 由于您正在运行 I/O 操作(S3 上传、数据库查询),您的线程将启动进程并返回池中,等待轮到它继续执行任务.虽然它是免费的,但它可以用于启动其他进程.也就是说,您不需要两个线程来完成这项工作,但我会让任务调度程序来决定.我想:

var dbSaveTask = KickOffTheDbSave();
var s3SaveTask = KickOffTheS3Save();
await Task.WhenAll(dbSaveTask, s3SaveTask);

我不完全了解您的流程,因此这些不是建议,而是需要考虑的事项:

I am not fully aware of your processes so these are not recommendations but rather things to consider:

  • 由于您使用 AWS 并将文件保存到 S3,因此您可以附加由 S3 上传事件触发的 lambda,然后将项目存储到数据库中.您可以将其视为某些交易.在您拥有文档之前,您不会在数据库中拥有记录.此外,它将分离流程以遵循 SOLID 原则;
  • 考虑使用Bulkhead Isolation 政策来限制资源.
  • Since you're working with AWS, and are saving files into S3, you could attach a lambda triggered by S3 upload event to then store the item into the database. You can think of it as of certain transaction. You won't have a record in the database until you have the document. Also, it will segregate the processes to follow SOLID principles;
  • Consider using Bulkhead Isolation policy for recourses limiting.

这篇关于处理SQS item Queue的多线程方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆