在 SQS 队列中使用多个消费者 [英] Using many consumers in SQS Queue

查看:81
本文介绍了在 SQS 队列中使用多个消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道可以使用多个线程来使用 SQS 队列.我想保证每条消息都会被消费一次.我知道可以更改消息的可见性超时,例如,等于我的处理时间.如果我的进程花费的时间超过可见性超时(例如连接速度较慢),则其他线程可以使用相同的消息.

I know that it is possible to consume a SQS queue using multiple threads. I would like to guarantee that each message will be consumed once. I know that it is possible to change the visibility timeout of a message, e.g., equal to my processing time. If my process spend more time than the visibility timeout (e.g. a slow connection) other thread can consume the same message.

保证消息被处理一次的最佳方法是什么?

What is the best approach to guarantee that a message will be processed once?

推荐答案

保证消息被处理一次的最佳方法是什么?

What is the best approach to guarantee that a message will be processed once?

您要求的是保证 - 您不会得到保证.您可以将邮件被多次处理的概率降低到非常小的数量,但您不会获得保证.

You're asking for a guarantee - you won't get one. You can reduce probability of a message being processed more than once to a very small amount, but you won't get a guarantee.

我将解释原因以及减少重复的策略.

I'll explain why, along with strategies for reducing duplication.

  1. 当您将消息放入 SQS 时,SQS 实际上可能会多次收到该消息
    • 例如:发送消息时的轻微网络故障导致自动重试的暂时性错误 - 从消息发送者的角度来看,它失败了一次,并成功发送了一次,但 SQS 收到了这两条消息.
  • 与第一个示例类似 - 有很多计算机在幕后处理消息,并且 SQS 需要确保不会丢失任何信息 - 消息存储在多个服务器上,这可能会导致重复.

在大多数情况下,通过利用 SQS 消息可见性超时,从这些来源复制的机会已经很小 - 就像百分之几小.

For the most part, by taking advantage of SQS message visibility timeout, the chances of duplication from these sources are already pretty small - like fraction of a percent small.

如果处理重复项确实那么糟糕(努力做到使您的消息消费成为幂等的!),我认为这已经足够了 - 进一步减少重复的机会很复杂而且可能很昂贵......

If processing duplicates really isn't that bad (strive to make your message consumption idempotent!), I'd consider this good enough - reducing chances of duplication further is complicated and potentially expensive...

好的,现在我们进入兔子洞......在高层次上,您需要为您的消息分配唯一的 id,并在开始处理之前检查正在进行或已完成的 id 的原子缓存:

Ok, here we go down the rabbit hole... at a high level, you will want to assign unique ids to your messages, and check against an atomic cache of ids that are in progress or completed before starting processing:

  1. 确保您的消息具有在插入时提供的唯一标识符
    • 没有这个,您将无法区分重复项.
  • 如果您的消息接收者需要将消息发送到箱外进行进一步处理,那么它可能是另一个重复来源(出于与上述类似的原因)
  • InProgress 条目应根据处理失败时需要恢复的速度设置超时.
  • 完成的条目应根据您希望重复数据删除窗口的时间设置超时
  • 最简单的可能是番石榴缓存,但是仅适用于单个处理应用程序.如果您有大量消息或分布式消费,请考虑为这项工作建立一个数据库(带有后台进程来清除过期条目)
  • InProgress entries should have a timeout based on how fast you need to recover in case of processing failure.
  • Completed entries should have a timeout based on how long you want your deduplication window
  • The simplest is probably a Guava cache, but would only be good for a single processing app. If you have a lot of messages or distributed consumption, consider a database for this job (with a background process to sweep for expired entries)
  • 不过,您可能负担不起无限存储空间.

一些注意事项

  • 请记住,在没有所有这些的情况下重复的机会已经很低了.根据消息重复数据删除的时间和金钱对您的价值,您可以随意跳过或修改任何步骤
    • 例如,您可以省略InProgress",但这会增加两个线程同时处理重复消息的可能性很小(第二个线程在第一个线程完成"之前开始)
      • 您的应用程序可能会在处理消息后立即崩溃/挂起/执行很长时间的 GC,但在 messageId 为已完成"之前(也许您正在为此存储使用数据库并且与它的连接已断开)
      • 在这种情况下,Processing"最终将过期,另一个线程可以处理此消息(在 SQS 可见性超时也过期后或因为 SQS 中有重复项).

      这篇关于在 SQS 队列中使用多个消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆