如何确保队列消息已在Azure Functions中成功处理? [英] How to make sure a queue message has been successfully processed in Azure Functions?

查看:71
本文介绍了如何确保队列消息已在Azure Functions中成功处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用HTTP触发器和队列触发器构建的C#Azure Functions(根据App Service计划)应用程序.该应用程序通过在客户端计算机上安装脚本来工作,该脚本使用SQL查询将客户端数据库中的各种文件移动到临时Azure Blob存储中,以从客户端数据库中提取各种文件.完成每个文件后,将调用HTTP触发器,该触发器将为队列触发器创建队列消息,以拾取该消息并将文件从临时Blob存储区移至Blob存储区中的永久位置.HTTP触发器完成并将消息放入队列后,执行将返回到客户端脚本以开始处理下一个SQL查询.

I have a C# Azure Functions (on the App Service plan) app built that uses HTTP Triggers and Queue Triggers. The application works by installing a script on a client's machine that pulls various files from a client database using SQL queries moving that output to a temporary Azure Blob Storage. After each file is completed an HTTP trigger is called that creates a queue message for the Queue Trigger to pick up the message and move the files from the temporary blob storage to a permanent spot in blob storage. After the HTTP trigger completes and puts a message in the queue, execution returns to the client script to begin processing the next SQL query.

我担心的是,当队列触发器实际上仍在工作或可能失败时,尤其是在并行处理多个客户端时,这些队列消息将堆积起来,并且客户端脚本将以错误的成功消息完成.有没有办法确保在继续下一个SQL查询之前队列消息已被成功处理?

My concern is that these queue messages will stack up and the client script will complete with a false success message when the Queue Trigger is actually still doing work or potentially failing, especially when multiple clients are being processed in parallel. Is there a way to make sure the queue message was successfully processed before moving on to the next SQL query?

添加代码示例

我可能有3个客户端,并且在其计算机上安装了应用程序,每个客户端都设置为在上午12点执行这些脚本,并且由于它们托管在客户端计算机上,因此可以并行运行.客户端脚本

I may have 3 clients with an application installed on their machine, each client is set to execute these scripts at 12AM and can run concurrently since they are hosted on the client machines. Client Scripts

// perform SQL query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)

当文件准备好处理时,这第一个 await 会发布到HTTP.
Azure函数HTTP触发器

This first await posts to HTTP when the file is ready to be processed.
Azure Functions HTTP Trigger

// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();

现在,我们在职位提交"中拥有来自多个客户端的文件队列.
Azure功能队列触发器

Now we have files from multiple clients in the "job-submissions" queue.
Azure Functions Queue Trigger

// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create meta file with info about the file 
// meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up  

问题是,当HTTP触发器将消息写入队列时,我无法得知队列已完成文件的处理.现在这不是什么大问题,因为该过程是如此之快,以至于我在HTTP触发器中向队列发送消息时,队列最多只需要几秒钟即可处理文件.我想知道各个作业何时完成的原因是,我在客户端脚本中有最后一步:
客户端脚本

So the issue is, when the HTTP trigger writes a message to the queue, I have no way of knowing that the queue has finished processing the file. Right now this isn't a big issue because the process happens so quickly that by the time I have sent a message to the queue in the HTTP trigger, it only takes at most a few seconds for the queue to process the file. The reason I would like to know when the individual jobs have completed is because I have a final step in the client scripts:
Client Scripts

// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed" 
// queue message contains client name in the message 
// initialVisibilityDelay set to 2 minutes 
// this ensures queue has finished processing the files

然后,一个单独的Python Azure函数在该队列上进行侦听以进行进一步处理:
Python QueueTrigger

Then a separate Python Azure Function listens on that queue to do further processing:
Python QueueTrigger

# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client2' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client3' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database

Python Azure函数处于消耗计划中,并且 batchSize 设置为 1 ,因为客户端文件有时可能很大,我不想超过1.5GB内存限制.因此,我有两个问题,第一个是如何知道第一个队列触发器已完成其工作?第二个是,如何确保Python QueueTrigger不开始累积消息?我认为可以通过为侦听相同队列的两个队列触发器创建单独的Azure函数来潜在地解决这两个问题.这样可以减轻双方的负担,但是我不确定这是否是最佳实践.在此处询问有关问题2的更多指导的地方,请参阅我的问题.

The Python Azure Function is on the consumption plan with a batchSize set to 1 because the client files can sometimes be large and I don't want to exceed the 1.5GB memory limit. So I have two issues, the first is how can I know the first queue trigger completed its work? The second is, how can I ensure that the Python QueueTrigger doesn't start to accumulate messages? I think both issues could potentially be solved by creating separate Azure Functions for both queue triggers that listen on the same queues. That would lighten the load on both sides, but I'm not sure if that is best practice. See my question here where I asked for more guidance on question 2: Using multiple Azure Functions QueueTriggers to listen on the same storage queue

推荐答案

更新:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;

namespace FunctionApp31
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {

            string a = "111";

            a=XX(a).Result;

            return new OkObjectResult(a);
        }

        public static async Task<string> XX(string x)
        {
            await Task.Run(()=>{
                Thread.Sleep(3000);
                x = x + "222";
                Console.WriteLine(x);
                }
                );

            return x;
        } 
    }
}

原始答案:

我建议您顺序执行处理逻辑,而不是异步执行.或者,您可以等待异步操作完成后再返回,以便可以确保执行成功后再返回成功.(这样可以避免在队列仍按注释中所述处理时返回结果.)

I suggest you execute the processing logic sequentially, rather than asynchronously. Or you can wait for the asynchronous operation to complete before returning, so that you can ensure that the execution is successful before returning success.(This can avoid returning results when the queue is still processing as you described in the comment.)

我注意到您问了一个新问题.我认为您可以扩展实例,而不用创建多个功能的应用程序.(当然,创建多个功能的应用程序没有问题.)如果您基于消耗计划,则实例将根据负载自动扩展.

I noticed that you asked a new question. I think you can extend the instance instead of creating multiple function apps. (Of course there is no problem creating multiple function apps) If you are based on a consumption plan, the instance will automatically scale according to the load.

这篇关于如何确保队列消息已在Azure Functions中成功处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆