Aws Sqs Consumer-仅当可以立即处理消息时才进行轮询 [英] Aws Sqs Consumer - Poll only when messages can be processed immediately

查看:104
本文介绍了Aws Sqs Consumer-仅当可以立即处理消息时才进行轮询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个AWS SQS Windows服务使用者,该使用者将以批处理方式轮询10条消息.每条消息将在其自己的任务中执行以并行执行.邮件处理包括调用不同的api和发送电子邮件,因此可能需要一些时间.

I'm trying to create an AWS SQS windows service consumer that will poll messages in batch of 10. Each messages will be executed in its own task for parallel execution. Message processing includes calling different api's and sending email so it might take some time.

我的问题是,首先,我只想在可以立即处理10条消息时轮询队列.这是由于sqs可见性超时以及接收到的消息等待"而导致的.可能会超过可见性超时并返回"在队列中.这将产生重复.我认为调整可见性超时不是一件好事,因为仍然有机会重复发送消息,这就是我要避免的事情.其次,我希望对并行性有某种限制(例如,最大限制为100个并发任务),因为服务器中还有其他应用程序正在运行,因此可以保持服务器资源的正常运行.

My problem is that first, I only want to poll the queue when 10 messages can be processed immediately. This is due to sqs visibility timeout and having the received messages "wait" might go over the visibility timeout and be "back" on the queue. This will produce duplication. I don't think tweaking the visibility timeout is good, because there are still chances that messages will be duplicated and that's what I'm trying to avoid. Second, I want to have some sort of limit for parallelism (ex. max limit of 100 concurrent tasks), so that server resources can be kept at bay since there are also other apps running in the server.

如何实现这一目标?还是有其他方法可以解决这些问题?

How to achieve this? Or are there any other way to remedy these problems?

推荐答案

此答案做出以下假设:

  1. 从AWS提取消息应进行序列化.仅消息处理应并行化.
  2. 从AWS提取的每条消息都应进行处理.在所有提取的消息都有机会被处理之前,整个执行不应终止.
  3. 应该等待每个消息处理操作.整个执行不应在所有开始的任务完成之前终止.
  4. 在处理消息期间发生的任何错误均应忽略.整个执行过程不应终止,因为单个消息的处理失败.
  5. 从AWS提取消息期间发生的任何错误都应该是致命的.整个执行应该终止,但不能在当前所有正在运行的消息处理操作完成之前终止.
  6. 执行机制应该能够处理以下情况:从AWS提取操作返回的批处理中的消息数量与请求的数量不同.

以下是(希望)满足以下要求的实现:

Below is an implementation that (hopefully) satisfies these requirements:

/// <summary>
/// Starts an execution loop that fetches batches of messages sequentially,
/// and process them one by one in parallel.
/// </summary>
public static async Task ExecutionLoopAsync<TMessage>(
    Func<int, Task<TMessage[]>> fetchMessagesAsync,
    Func<TMessage, Task> processMessageAsync,
    int fetchCount,
    int maxDegreeOfParallelism,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);

    // Count how many times we have acquired the semaphore, so that we know
    // how many more times we have to acquire it before we exit from this method.
    int acquiredCount = 0;
    try
    {
        while (true)
        {
            Debug.Assert(acquiredCount == 0);
            for (int i = 0; i < fetchCount; i++)
            {
                await semaphore.WaitAsync(cancellationToken);
                acquiredCount++;
            }

            TMessage[] messages = await fetchMessagesAsync(fetchCount)
                ?? Array.Empty<TMessage>();

            for (int i = 0; i < messages.Length; i++)
            {
                if (i >= fetchCount) // We got more messages than we asked for
                {
                    await semaphore.WaitAsync();
                    acquiredCount++;
                }
                ProcessAndRelease(messages[i]);
                acquiredCount--;
            }

            if (messages.Length < fetchCount)
            {
                // We got less messages than we asked for
                semaphore.Release(fetchCount - messages.Length);
                acquiredCount -= fetchCount - messages.Length;
            }

            // This method is 'async void' because it is not expected to throw ever
            async void ProcessAndRelease(TMessage message)
            {
                try { await processMessageAsync(message); }
                catch { } // Swallow exceptions
                finally { semaphore.Release(); }
            }
        }
    }
    catch (SemaphoreFullException)
    {
        // Guard against the (unlikely) scenario that the counting logic is flawed.
        // The counter is no longer reliable, so skip the awaiting in finally.
        acquiredCount = maxDegreeOfParallelism;
        throw;
    }
    finally
    {
        // Wait for all pending operations to complete. This could cause a deadlock
        // in case the counter has become out of sync.
        for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
            await semaphore.WaitAsync();
    }
}

用法示例:

var cts = new CancellationTokenSource();

Task executionTask = ExecutionLoopAsync<Message>(async count =>
{
    return await GetBatchFromAwsAsync(count);
}, async message =>
{
    await ProcessMessageAsync(message);
}, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);

这篇关于Aws Sqs Consumer-仅当可以立即处理消息时才进行轮询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆