适用于Azure App Service和.NET Core 3.1中长期运行的计算的解决方案? [英] Appropriate solution for long running computations in Azure App Service and .NET Core 3.1?

查看:77
本文介绍了适用于Azure App Service和.NET Core 3.1中长期运行的计算的解决方案?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于不需要数据库且该应用程序外没有IO的应用程序中,对于Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务.

具体来说,以下内容不可靠,需要解决方案.

  [Route("service")][HttpPost]公共出站帖子(入站入站){Debug.Assert(inbound.Message.Equals("Hello服务器".)));出站出站=新的Outbound();长十亿= 1000000000;for(long i = 0; i< 33 * Billion; i ++)//230秒;outbound.Message = String.Format(服务器处理的入站对象.");退货} 

这有时会向 HttpClient 返回一个空对象(未显示).较小的工作量将永远成功.例如,30亿次迭代总是成功.更大的数目会很好,特别是需要2400亿.

我认为,到2020年,带有.NET Core的Azure App Service中的合理目标可能是在8个子线程的帮助下使父线程数达到2400亿,因此每个子数达到300亿,而父子级将将8 M字节的入站对象分成每个孩子入站的较小对象.每个子代接收一个1 M字节的入站,并向父代返回一个1 M字节的出站.父级将结果重新组合成8 M字节的出站.

显然,经过时间将是单线程实现所需时间的12.5%,或1/8,或八分之一.与计算时间相比,切碎和重新组装对象的时间短.我假设与计算时间相比,传输对象的时间非常短,因此12.5%的期望值大致准确.

如果我可以获得4或8个内核,那将是很好的.如果我得到的线程可以说是一个核心周期的50%,那么我可能需要8或16个线程.如果每个线程给我33%的内核周期,那么我将需要12或24个线程.

我正在考虑 BackgroundService 类,但是我想确认这是正确的方法.微软说...

  BackgroundService是用于实现长时间运行的IHostedService的基类. 

很显然,如果长时间运行某件事,最好通过 System.Threading 使用多个内核来使其更快完成,但这

对于不需要数据库且该应用程序外没有IO的应用程序中,对于Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务.

序言

几年前,我遇到了一个非常相似的问题.我们需要一种可以处理大量数据的服务.有时,处理过程将花费10秒,而其他时候则可能需要一个小时.

首先,我们按照您的问题做了说明:将请求发送到服务,服务处理请求中的数据,并在完成后返回响应.

问题随时出现

当工作只花了大约一分钟或更短的时间,这很好,但是在此之上的任何事情,服务器将关闭会话,并且调用者将报告错误.

在放弃请求之前,服务器默认会有2分钟左右的时间来产生响应.它不会退出请求的处理...但是会退出HTTP会话.在您的 HttpClient 上设置什么参数都没有关系,服务器是代表多久的服务器.

问题原因

所有这些都是有充分理由的.服务器套接字非常昂贵.您有一定数量要走动.服务器试图通过切断比指定时间更长的请求来保护您的服务,以避免套接字短缺的问题.

通常,您希望HTTP请求仅花费几毫秒的时间.如果他们花费的时间更长,那么如果您的服务必须高效率地完成其他请求,您最终将遇到套接字问题.

解决方案

我们决定采用 IHostedService ,特别是 BackgroundService 的路线.我们将此服务与Queue结合使用.这样,您可以设置一个队列,并且 BackgroundService 可以一次处理一个队列(在某些情况下,我们可以同时处理多个队列项目,而在另一些情况下,我们可以横向扩展以生成两个或多个队列项目)队列).

为什么ASP.NET Core服务运行 BackgroundService ?我想在不与任何特定于Azure的构造紧密耦合的情况下进行处理,以防万一我们需要从Azure迁移到其他云服务(在当时,由于当时的其他原因,我们正在考虑这样做).

这对我们来说效果很好,此后我们再也没有发现任何问题.

工作流程如下:

  1. 呼叫者使用一些参数向服务发送请求
  2. 服务生成职位"对象并通过202(接受)响应立即返回ID
  3. Service将此作业放入由 BackgroundService
  4. 维护的队列中
  5. 呼叫者可以查询作业状态,并获取有关使用此作业ID完成了多少工作以及还有多少要做的信息
  6. 服务完成工作,将工作置于完成"状态.状态,然后返回等待队列以产生更多工作

请记住,您的服务具有在多个实例在运行的情况下水平扩展的能力.在这种情况下,我使用Redis缓存存储作业的状态,以便所有实例共享相同的状态.

我还在内存缓存"中添加了如果没有可用的Redis缓存,则可以在本地进行测试.您可以运行"Memory Cache"服务器上的服务,只要知道它可以扩展,那么您的数据就会不一致.

示例

由于我已经与孩子结婚,所以每个人上床睡觉后的星期五晚上我的确做得很少,所以我花了一些时间整理一个可以尝试的例子.完整的解决方案也可供您试用.

QueuedBackgroundService.cs

该类实现有两个特定目的:一个是从队列中读取( BackgroundService 实现),另一个是写入队列( IQueuedBackgroundService 实现).

 公共接口IQueuedBackgroundService{任务< JobCreatedModel>PostWorkItemAsync(JobParametersModel jobParameters);}公共密封类QueuedBackgroundService:BackgroundService,IQueuedBackgroundService{私有密封类JobQueueItem{公共字符串JobId {get;放;}公共JobParametersModel JobParameters {get;放;}}私有只读IComputationWorkService _workService;私有只读IComputationJobStatusService _jobStatusService;//在BackgroundService和IQueuedBackgroundService之间共享.//排队机制可以移至单例服务.我在做//为了简便起见,采用这种方式.私有静态只读ConcurrentQueue< JobQueueItem>_queue =新的ConcurrentQueue< JobQueueItem>();私有静态只读SemaphoreSlim _signal = new SemaphoreSlim(0);public QueuedBackgroundService(IComputationWorkService workService,IComputationJobStatusService jobStatusService){_workService = workService;_jobStatusService = jobStatusService;}///< summary>///通过IQueuedBackgroundService的瞬态方法///</summary>公共异步Task< JobCreatedModel>PostWorkItemAsync(JobParametersModel jobParameters){var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);_queue.Enqueue(new JobQueueItem {JobId = jobId,JobParameters = jobParameters});_signal.Release();//发出信号,要求后台服务开始工作返回新的JobCreatedModel {JobId = jobId,QueuePosition = _queue.Count};}///< summary>///通过BackgroundService长时间运行的任务///</summary>受保护的重写异步任务ExecuteAsync(CancellationToken startingToken){while(!stoppingToken.IsCancellationRequested){JobQueueItem jobQueueItem = null;尝试{//等待队列发出信号,说明需要完成某些工作await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);//使项目出队jobQueueItem = _queue.TryDequeue(out var workItem)吗?workItem:null;if(jobQueueItem!= null){//将作业放入处理"目录状态等待_jobStatusService.UpdateJobStatusAsync(jobQueueItem.JobId,JobStatus.Processing).ConfigureAwait(false);//繁重的工作在这里完成...var result = await _workService.DoWorkAsync(jobQueueItem.JobId,jobQueueItem.JobParameters,startingToken).ConfigureAwait(false);//存储工作结果,并将状态设置为完成".等待_jobStatusService.StoreJobResultAsync(jobQueueItem.JobId,结果,JobStatus.Success).ConfigureAwait(false);}}catch(TaskCanceledException){休息;}抓住(例外){尝试{//出问题了.将作业置于错误状态,然后继续等待_jobStatusService.StoreJobResultAsync(jobQueueItem.JobId,新的JobResultModel{异常=新的JobExceptionModel(ex)},JobStatus.Errored).ConfigureAwait(false);}catch(异常){//TODO:记录此}}}}} 

它是这样注入的:

  services.AddHostedService< QueuedBackgroundService>();services.AddTransient< IQueuedBackgroundService,QueuedBackgroundService>(); 

ComputationController.cs

用于读取/写入作业的控制器如下所示:

  [ApiController,Route("api/[controller]"))]公共类ComputationController:ControllerBase{私有只读IQueuedBackgroundService _queuedBackgroundService;私有只读IComputationJobStatusService _computationJobStatusService;公共ComputationController(IQueuedBackgroundService queuedBackgroundService,IComputationJobStatusService CalculationJobStatusService){_queuedBackgroundService = queuedBackgroundService;_computationJobStatusService = CalculationJobStatusService;}[HttpPost,Route("beginComputation"))[ProducesResponseType(StatusCodes.Status202Accepted,类型= typeof(JobCreatedModel))]公共异步Task< IActionResult>BeginComputation([FromBody] JobParametersModel obj){返回已接受(等待_queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));}[HttpGet,Route("computationStatus/{jobId}"))][ProducesResponseType(StatusCodes.Status200OK,类型= typeof(JobModel))][ProducesResponseType(StatusCodes.Status404NotFound,Type = typeof(string))]公共异步Task< IActionResult>GetComputationResultAsync(字符串jobId){var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);如果(工作!= null){返回确定(工作);}返回NotFound($"ID为{jobId}`的作业未找到");}[HttpGet,Route("getAllJobs"))][ProducesResponseType(StatusCodes.Status200OK,类型= typeof(IReadOnlyDictionary< string,JobModel>))]公共异步Task< IActionResult>GetAllJobsAsync(){返回Ok(等待_computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));}[HttpDelete,Route("clearAllJobs"))][ProducesResponseType(StatusCodes.Status200OK)][ProducesResponseType(StatusCodes.Status401Unauthorized)]公共异步Task< IActionResult>ClearAllJobsAsync([FromQuery]字符串权限){if(permission ==这是flakey安全性,因此可以作为公共演示运行"){等待_computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);返回Ok();}返回Unauthorized();}} 

工作示例

只要这个问题一直有效,我就会保持有效的示例,您可以尝试一下.对于此特定示例,您可以指定要运行的迭代次数.为了模拟长时间运行的工作,每个迭代为1秒.因此,如果将迭代值设置为60,它将使该作业运行60秒.

在运行时,运行 computationStatus/{jobId} getAllJobs 端点.您可以观看所有作业的实时更新.

这个示例远非一个功能齐全的涵盖所有边缘情况的成熟的可立即投入生产的示例,但这是一个良好的开端.

结论

在后端工作了几年之后,我看到了很多问题,因为他们不了解所有的规则".后端.希望这个答案能对我过去遇到的问题有所启发,并希望这使您不必再面对上述问题.

What is an appropriate solution for long running computations in Azure App Service and .NET Core 3.1 in an application that has no need for a database and no IO to anything outside of this application ? It is a computation task.

Specifically, the following is unreliable and needs a solution.

[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
    Debug.Assert(inbound.Message.Equals("Hello server."));
    Outbound outbound = new Outbound();
    long Billion = 1000000000;
    for (long i = 0; i < 33 * Billion; i++) // 230 seconds
        ;
    outbound.Message = String.Format("The server processed inbound object.");
    return outbound;
}

This sometimes returns a null object to the HttpClient (not shown). A smaller workload will always succeed. For example 3 billion iterations always succeeds. A bigger number would be nice specifically 240 billion is a requirement.

I think in the year 2020 a reasonable goal in Azure App Service with .NET Core might be to have a parent thread count to 240 billion with the help of 8 child threads so each child counts to 30 billion and the parent divides an 8 M byte inbound object into smaller objects inbound to each child. Each child receives a 1 M byte inbound and returns to the parent a 1 M byte outbound. The parent re-assembles the result into a 8 M byte outbound.

Obviously the elapsed time will be 12.5%, or 1/8, or one-eighth, of the time a single thread implementation would need. The time to cut-up and re-assemble objects is small compared to the computation time. I am assuming the time to transmit the objects is very small compared to the computation time so the 12.5% expectation is roughly accurate.

If I can get 4 or 8 cores that would be good. If I can get threads that give me say 50% of the cycles of a core, then I would need may be 8 or 16 threads. If each thread gives me 33% of the cycles of a core then I would need 12 or 24 threads.

I am considering the BackgroundService class but I am looking for confirmation that this is the correct approach. Microsoft says...

BackgroundService is a base class for implementing a long running IHostedService.

Obviously if something is long running it would be better to make it finish sooner by using multiple cores via System.Threading but this documentation seems to mention System.Threading only in the context of starting tasks via System.Threading.Timer. My example code shows there is no timer needed in my application. An HTTP POST will serve as the occasion to do work. Typically I would use System.Threading.Thread to instantiate multiple objects to use multiple cores. I find the absence of any mention of multiple cores to be a glaring omission in the context of a solution for work that takes a long time but may be there is some reason Azure App Service doesn't deal with this matter. Perhaps I am just not able to find it in tutorials and documentation.

The initiation of the task is the illustrated HTTP POST controller. Suppose the longest job takes 10 minutes. The HTTP client (not shown) sets the timeout limit to 1000 seconds which is much more than 10 minutes (600 seconds) in order for there to be a margin of safety. HttpClient.Timeout is the relevant property. For the moment I am presuming the HTTP timeout is a real limit; rather than some sort of non-binding (fake limit) such that some other constraint results in the user waiting 9 minutes and receiving an error message. A real binding limit is a limit for which I can say "but for this timeout it would have succeeded". If the HTTP timeout is not the real binding limit and there is something else constraining the system, I can adjust my HTTP controller to instead have three (3) POST methods. Thus POST1 would mean start a task with the inbound object. POST2 means tell me if it is finished. POST3 means give me the outbound object.

What is an appropriate solution for long running computations in Azure App Service and .NET Core 3.1 in an application that has no need for a database and no IO to anything outside of this application ? It is a computation task.

解决方案

Prologue

A few years ago a ran in to a pretty similar problem. We needed a service that could process large amounts of data. Sometimes the processing would take 10 seconds, other times it could take an hour.

At first we did it how your question illustrates: Send a request to the service, the service processes the data from the request and returns the response when finished.

Issues At Hand

This was fine when the job only took around a minute or less, but anything above this, the server would shut down the session and the caller would report an error.

Servers have a default of around 2 minutes to produce a response before it gives up on the request. It doesn't quit the processing of the request... but it does quit the HTTP session. It doesn't matter what parameters you set on your HttpClient, the server is the one that delegates how long is too long.

Reasons For Issues

All this is for good reasons. Server sockets are extremely expensive. You have a finite amount to go around. The server is trying to protect your service by severing requests that are taking longer than a specified time in order to avoid socket starvation issues.

Typically you want your HTTP requests to take only a few milliseconds. If they are taking longer than this, you will eventually run in to socket issues if your service has to fulfil other requests at a high rate.

Solution

We decided to go the route of IHostedService, specifically the BackgroundService. We use this service in conjunction with a Queue. This way you can set up a queue of jobs and the BackgroundService will process them one at a time (in some instances we have service processing multiple queue items at once, in others we scaled horizontally producing two or more queues).

Why an ASP.NET Core service running a BackgroundService? I wanted to handle this without tightly-coupling to any Azure-specific constructs in case we needed to move out of Azure to some other cloud service (back in the day we were contemplating this for other reasons we had at the time.)

This has worked out quite well for us and we haven't seen any issues since.

The work flow goes like this:

  1. Caller sends a request to the service with some parameters
  2. Service generates a "job" object and returns an ID immediately via 202 (accepted) response
  3. Service places this job in to a queue that is being maintained by a BackgroundService
  4. Caller can query the job status and get information about how much has been done and how much is left to go using this job ID
  5. Service finishes the job, puts the job in to a "completed" state and goes back to waiting on the queue to produce more jobs

Keep in mind your service has the capability to scale horizontally where there would be more than one instance running. In this case I am using Redis Cache to store the state of the jobs so that all instances share the same state.

I also added in a "Memory Cache" option to test things locally if you don't have a Redis Cache available. You could run the "Memory Cache" service on a server, just know that if it scales then your data will be inconsistent.

Example

Since I'm married with kids, I really don't do much on Friday nights after everyone goes to bed, so I spent some time putting together an example that you can try out. The full solution is also available for you to try out.

QueuedBackgroundService.cs

This class implementation serves two specific purposes: One is to read from the queue (the BackgroundService implementation), the other is to write to the queue (the IQueuedBackgroundService implementation).

public interface IQueuedBackgroundService
{
    Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
}

public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
{
    private sealed class JobQueueItem
    {
        public string JobId { get; set; }
        public JobParametersModel JobParameters { get; set; }
    }

    private readonly IComputationWorkService _workService;
    private readonly IComputationJobStatusService _jobStatusService;

    // Shared between BackgroundService and IQueuedBackgroundService.
    // The queueing mechanism could be moved out to a singleton service. I am doing
    // it this way for simplicity's sake.
    private static readonly ConcurrentQueue<JobQueueItem> _queue =
        new ConcurrentQueue<JobQueueItem>();
    private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    public QueuedBackgroundService(IComputationWorkService workService,
        IComputationJobStatusService jobStatusService)
    {
        _workService = workService;
        _jobStatusService = jobStatusService;
    }

    /// <summary>
    /// Transient method via IQueuedBackgroundService
    /// </summary>
    public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
    {
        var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
        _queue.Enqueue(new JobQueueItem { JobId = jobId, JobParameters = jobParameters });
        _signal.Release(); // signal for background service to start working on the job
        return new JobCreatedModel { JobId = jobId, QueuePosition = _queue.Count };
    }

    /// <summary>
    /// Long running task via BackgroundService
    /// </summary>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while(!stoppingToken.IsCancellationRequested)
        {
            JobQueueItem jobQueueItem = null;
            try
            {
                // wait for the queue to signal there is something that needs to be done
                await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);

                // dequeue the item
                jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;

                if(jobQueueItem != null)
                {
                    // put the job in to a "processing" state
                    await _jobStatusService.UpdateJobStatusAsync(
                        jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);

                    // the heavy lifting is done here...
                    var result = await _workService.DoWorkAsync(
                        jobQueueItem.JobId, jobQueueItem.JobParameters,
                        stoppingToken).ConfigureAwait(false);

                    // store the result of the work and set the status to "finished"
                    await _jobStatusService.StoreJobResultAsync(
                        jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
                }
            }
            catch(TaskCanceledException)
            {
                break;
            }
            catch(Exception ex)
            {
                try
                {
                    // something went wrong. Put the job in to an errored state and continue on
                    await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
                    {
                        Exception = new JobExceptionModel(ex)
                    }, JobStatus.Errored).ConfigureAwait(false);
                }
                catch(Exception)
                {
                    // TODO: log this
                }
            }
        }
    }
}

It is injected as so:

services.AddHostedService<QueuedBackgroundService>();
services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();

ComputationController.cs

The controller used to read/write jobs looks like this:

[ApiController, Route("api/[controller]")]
public class ComputationController : ControllerBase
{
    private readonly IQueuedBackgroundService _queuedBackgroundService;
    private readonly IComputationJobStatusService _computationJobStatusService;

    public ComputationController(
        IQueuedBackgroundService queuedBackgroundService,
        IComputationJobStatusService computationJobStatusService)
    {
        _queuedBackgroundService = queuedBackgroundService;
        _computationJobStatusService = computationJobStatusService;
    }

    [HttpPost, Route("beginComputation")]
    [ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
    public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
    {
        return Accepted(
            await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
    }

    [HttpGet, Route("computationStatus/{jobId}")]
    [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
    [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
    public async Task<IActionResult> GetComputationResultAsync(string jobId)
    {
        var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
        if(job != null)
        {
            return Ok(job);
        }
        return NotFound($"Job with ID `{jobId}` not found");
    }

    [HttpGet, Route("getAllJobs")]
    [ProducesResponseType(StatusCodes.Status200OK,
        Type = typeof(IReadOnlyDictionary<string, JobModel>))]
    public async Task<IActionResult> GetAllJobsAsync()
    {
        return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
    }

    [HttpDelete, Route("clearAllJobs")]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status401Unauthorized)]
    public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
    {
        if(permission == "this is flakey security so this can be run as a public demo")
        {
            await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
            return Ok();
        }
        return Unauthorized();
    }
}

Working Example

For as long as this question is active, I will maintain a working example you can try out. For this specific example, you can specify how many iterations you would like to run. To simulate long-running work, each iteration is 1 second. So, if you set the iteration value to 60, it will run that job for 60 seconds.

While it's running, run the computationStatus/{jobId} or getAllJobs endpoint. You can watch all the jobs update in real time.

This example is far from a fully-functioning-covering-all-edge-cases-full-blown-ready-for-production example, but it's a good start.

Conclusion

After a few years of working in the back-end, I have seen a lot of issues arise by not knowing all the "rules" of the back-end. Hopefully this answer will shed some light on issues I had in the past and hopefully this saves you from having to deal with said problems.

这篇关于适用于Azure App Service和.NET Core 3.1中长期运行的计算的解决方案?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆