信号量块,尽管它不是满的 [英] Semaphore blocks though it isn't full

查看:58
本文介绍了信号量块,尽管它不是满的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试优化一个旧的、编写得非常糟糕的类,它处理大量数据,因此很容易花费数小时来运行一组数据。收集数据已经花费了很多时间,这是我在这里试图改进的地方。我知道这是相当难闻的代码,但这只是一个测试,如果这能改善什么的话,所以请只关注这个问题:

我尝试了SemaphoreSlimSemaphore以减少并发运行的任务数量。我的数据集将生成大约70个任务,这可能会导致线程匮乏和整体性能下降。至少,它的反应变得不那么灵敏了。因此,我尝试将其保持在5个任务的同时,以获得更好的总体吞吐量。

现在,当我尝试等待我的任务进入Sempahore时,它会阻塞(使用aWait Also块的细长信号量),但它永远不会进入,即使信号量没有满。此代码位于异步方法内部,作为轻微的上下文提示。

Semaphore throttle = new Semaphore(0, 5);

try
{
    foreach (var folder in folders)
    {
        // Wait in case there are already 5 tasks running to reduce thread starvation
        collectionTasks.Add(Task.Run( () =>
        {
            // ReSharper disable once AccessToDisposedClosure
            throttle.WaitOne();
            return GetGapProfiles(folder.Value, progress, token);
        }, token).ContinueWith(
            t =>
            {
                // ReSharper disable once AccessToDisposedClosure
                throttle.Release();
                return t.Result;
            }, TaskContinuationOptions.None));
    }

    // When all are loaded concat all results into one collection
    await Task.WhenAll(collectionTasks);
}
catch (Exception ex)
{
    Log.Error(ex, "Failed to collect profiles.");
}
finally
{
    throttle.Dispose();
}

我就是不明白为什么这个会阻塞,永远不会进入GetGapProfiles。有谁能解释一下这个吗?

推荐答案

public static class perTaskThrottle
{
    /// <summary>
    /// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
    /// </summary>
    /// <typeparam name="TInput"></typeparam>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="sourceItems"></param>
    /// <param name="func"></param>
    /// <param name="concurrentTasks"></param>
    /// <returns></returns>
    public static Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
        this IEnumerable<TInput> sourceItems,
        Func<TInput, Task<TResult>> func,
        int concurrentTasks = 1)
    {
        return ForEachAsyncThrottled(sourceItems, func, CancellationToken.None, concurrentTasks);
    }

    /// <summary>
    /// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
    /// </summary>
    /// <typeparam name="TInput"></typeparam>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="sourceItems"></param>
    /// <param name="func"></param>
    /// <param name="token"></param>
    /// <param name="concurrentTasks"></param>
    /// <returns></returns>
    public static async Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
        this IEnumerable<TInput> sourceItems,
        Func<TInput, Task<TResult>> func,
        CancellationToken token,
        int concurrentTasks = 1)
    {
        var result = new ConcurrentDictionary<TInput, TResult>();

        var tasksList = new List<Task>();
        using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
        {
            foreach (var item in sourceItems)
            {
                token.ThrowIfCancellationRequested();

                // if there are already concurrentTasks tasks executing, pause until one has completed ( semaphoreSlim.Release() )
                await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever, token).ConfigureAwait(false);

                token.ThrowIfCancellationRequested();

                Action<Task<TResult>> okContinuation = async task =>
                {
                    // the task has already completed if status is CompletedOk, but using await once more is safer than using task.Result
                    var taskResult = await task;
                    result[item] = taskResult;
                };

                // ReSharper disable once AccessToDisposedClosure
                Action<Task> allContinuation = task => semaphoreSlim.Release();

                tasksList.Add(func.Invoke(item)
                    .ContinueWith(okContinuation, TaskContinuationOptions.OnlyOnRanToCompletion)
                    .ContinueWith(allContinuation, token));

                token.ThrowIfCancellationRequested();
            }

            if (!token.IsCancellationRequested)
            {
                await Task.WhenAll(tasksList).ConfigureAwait(false);
            }
        }

        return result;
    }
}

所以在您的情况下,您可以使用

var results = folders.ForEachAsyncThrottled( (f) => GetGapProfiles(f.Value), token, 5);

这篇关于信号量块,尽管它不是满的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆