如何从同时运行的任务更新进度条 [英] how to update the progress bar from tasks running concurrently

查看:73
本文介绍了如何从同时运行的任务更新进度条的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试将并行任务绑定到包含pprogressBars的listView.我正在使用有限的调度程序,该调度程序仅允许指定最大并行度.到目前为止,它在大多数情况下都可以正常运行,但是偶尔有两个任务会更新listView中的同一进度栏.下面是我的代码.

I am try to bind parallel task to a listView that contains pprogressBars. I am using a limited scheduler that only allows specified maximum of parallel degree. So far it works OK most of the time but occasionally two tasks updates the same progress bar in the listView. Below is my code.

有什么主意如何防止两个任务更新listView中的同一进度栏?还是如何从同时运行的任务中更新进度栏?

Any idea how to prevent two tasks from updating the same progressbar in the listView? Or how to update the progress bar from tasks running concurrently?

public class MyClass
{
  public ObservableCollection<StatusInfo> StatusItems { get; set; }
  private Object thisLock = new Object();

  public int Process() //handled
  {
    StatusItems = new ObservableCollection<StatusInfo>();
    for (int i = 0; i < 4; i++)  // initialize progress bar collection
    {
      StatusInfo sInfo = new StatusInfo();
      sInfo.ThreadID = i;
      sInfo.Table = "No Table";
      sInfo.Percentage = 0;
      sInfo.Status = AppInfo.AVAILABLE;
      sInfo.Minimum = 0;
      sInfo.Maximum = 100;
      sInfo.Visibility = Visibility.Visible;
      StatusItems.Add(sInfo);
    }
    Parent.StatusItems = StatusItems; // assign to viewmodel

    int numberOfBackGroundThread = 4;
    LimitedTaskScheduler scheduler = new LimitedTaskScheduler(numberOfBackGroundThread);
    TaskFactory factory = new TaskFactory(scheduler);
    var ui = LimitedTaskScheduler.FromCurrentSynchronizationContext();

    Task[] tasks = new Task[rows.Count];
    for (int i = 0; i < rows.Count; i++)
    {
      ......
      tasks[i] = factory.StartNew<string>(() =>
      {
        int barIndex = -1;
        int threadID = Thread.CurrentThread.ManagedThreadId;
        cnt++;
        if (cnt > numberOfBackGroundThread - 1)
        {
          while (true)
          {                 
            for (int j = 0; j < numberOfBackGroundThread; j++)
            {
              lock (thisLock)
              {
                if (StatusItems[j].Status == "AVAILABLE" || StatusItems[j].Status == "DONE") 
                {
                  //Console.WriteLine(String.Format("Current table = {0}, Cuurent table type = {1}, BarIndex = {2}, ThreadID = {3}, Prev TableName = {4}, Prev TableType = {5}, Prev Status = {6} PrevThreadID = {7}", tabName, tabType, j.ToString(), Thread.CurrentThread.ManagedThreadId.ToString(), StatusItems[j].Table, StatusItems[j].TabType, StatusItems[j].Status, StatusItems[j].ThreadID));
                  // 0. Assign the current task to a display slot StatusItems[j] that is currently free.
                  // 1. We lock this piece of code to prevent another concurrent task from picking the same slot.
                  // 2. And we update the slot as "Occupied" immediately to avoid to prevent another concurrent task from picking the same slot.
                  // 3. We also add one extra slot to avoid concurrent tasks competing on the same sidplay slot and reduce wait on the lock.
                  // All of the above cannot completely avoid two thread using the same display slot. We may need a new way to 
                  // 4. Since multiple tasks may run on the same thread we may see the same thread ID appear on different display slots.
                  StatusItems[j].Status = "Occupied";
                  barIndex = j;
                  break; // break for loop
                }
              }
            }
            if (barIndex >= 0) { break; }  // break while loop
          }
        }
        else { barIndex = cnt; }

        StatusItems[barIndex].TabType = tabType;
        StatusItems[barIndex].ThreadID = threadID;

        int nStatus = IndividualProcess(barIndex);
        if (nStatus < 0) { AppInfo.JobStatus = "01"; }
        return result;
      });
    }
    var done = factory.ContinueWhenAll(tasks, completedTasks => { AppInfo.Finished = true; });
    done.ContinueWith(completedTasks => { int nStatus = PostProcess(); }, ui);
    return returnStatus;
  }

  private int IndividualProcess(int barIndex)
  {
     for (int i=0; i< 100; i++)  
     {
       perform work...
       SetProgressbar (i, StatusItems, barIndex, "in progress")
     }
     SetProgressbar (100, StatusItems, barIndex, "DONE")
  } 

  public void SetProgressbar(int pPercent, ObservableCollection<StatusInfo> pInfo, int pInfoIndex, string pStatus)
  {
    try // Increment percentage for COPY or nested PURGE 
    {
      if (Application.Current.Dispatcher.Thread != System.Threading.Thread.CurrentThread)
      {
        Application.Current.Dispatcher.BeginInvoke(new Action(() =>
        {
          ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
          ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; 
          ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
        }));
      }
      else // When the current thread is main UI thread. The label won't be updated until the EntityCopy() finishes.
      {
        ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
        ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; 
        ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
      }
    }
    catch { throw; }
  }      
}

public class LimitedTaskScheduler : TaskScheduler
{
  // Fields
  // Whether the current thread is processing work items.
  [ThreadStatic]
  private static bool _currentThreadIsProcessingItems;
  // The list of tasks to be executed. 
  private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
  /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 
  private readonly int _maxDegreeOfParallelism;
  /// <summary>Whether the scheduler is currently processing work items.</summary> 
  private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 

  /// <summary> 
  /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 
  /// specified degree of parallelism. 
  /// </summary> 
  /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
  public LimitedTaskScheduler(int maxDegreeOfParallelism)
  {
      if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
      _maxDegreeOfParallelism = maxDegreeOfParallelism;
  }

  /// <summary>Queues a task to the scheduler.</summary> 
  /// <param name="task">The task to be queued.</param>
  protected sealed override void QueueTask(Task task)
  {
      // Add the task to the list of tasks to be processed.  If there aren't enough 
      // delegates currently queued or running to process tasks, schedule another. 
    lock (_tasks)
      {
          _tasks.AddLast(task);
          if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
          {
              ++_delegatesQueuedOrRunning;
              NotifyThreadPoolOfPendingWork();
          }
      }
  }

  /// <summary> 
  /// Informs the ThreadPool that there's work to be executed for this scheduler. 
  /// </summary> 
  private void NotifyThreadPoolOfPendingWork()
  {
      ThreadPool.UnsafeQueueUserWorkItem(_ =>
      {
          // Note that the current thread is now processing work items. 
          // This is necessary to enable inlining of tasks into this thread.
          _currentThreadIsProcessingItems = true;
          try
          {
              // Process all available items in the queue. 
              while (true)
              {
                  Task item;
                  lock (_tasks)
                  {
                      // When there are no more items to be processed, 
                      // note that we're done processing, and get out. 
                      if (_tasks.Count == 0)
                      {
                          --_delegatesQueuedOrRunning;
                          break;
                      }

                      // Get the next item from the queue
                      item = _tasks.First.Value;
                      _tasks.RemoveFirst();
                  }

                  // Execute the task we pulled out of the queue 
                  base.TryExecuteTask(item);
              }
          }
          // We're done processing items on the current thread 
          finally { _currentThreadIsProcessingItems = false; }
      }, null);
  }

  /// <summary>Attempts to execute the specified task on the current thread.</summary> 
  /// <param name="task">The task to be executed.</param>
  /// <param name="taskWasPreviouslyQueued"></param>
  /// <returns>Whether the task could be executed on the current thread.</returns> 
  protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  {
      // If this thread isn't already processing a task, we don't support inlining 
      if (!_currentThreadIsProcessingItems) return false;

      // If the task was previously queued, remove it from the queue 
      if (taskWasPreviouslyQueued) TryDequeue(task);

      // Try to run the task. 
      return base.TryExecuteTask(task);
  }

  /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 
  /// <param name="task">The task to be removed.</param>
  /// <returns>Whether the task could be found and removed.</returns> 
  protected sealed override bool TryDequeue(Task task)
  {
      lock (_tasks) return _tasks.Remove(task);
  }

  /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
  public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

  /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 
  /// <returns>An enumerable of the tasks currently scheduled.</returns> 
  protected sealed override IEnumerable<Task> GetScheduledTasks()
  {
      bool lockTaken = false;
      try
      {
          Monitor.TryEnter(_tasks, ref lockTaken);
          if (lockTaken) return _tasks.ToArray();
          else throw new NotSupportedException();
      }
      finally
      {
          if (lockTaken) Monitor.Exit(_tasks);
      }
  }
}

更新:任务计划程序可能不相关.我把它放在这里,以防万一有人发现我从未想过要做或错过的新事物.

Update: Task scheduler may not be relevant. I put it here just in case someone could find something new that I never thought of doing or missing.

推荐答案

任何想法如何防止两个任务更新同一进度条listView?或如何从正在运行的任务更新进度条并发?

Any idea how to prevent two tasks to update the same progressbar in the listView? Or how to update the progress bar from tasks running concurrently?

如果您有多个并行运行的任务,并且您确实想向用户显示每个任务的进度 ,则需要显示每个任务的单独进度栏.

If you have more than one task running in parallel, and you really want to show the progress of each individual task to the user, you'd need to show a separate progress bar for each task.

操作方式取决于UI结构.例如,如果您有一个列表视图,其中每个项目都是一个任务,则可以为每个列表视图项目添加进度栏作为子窗口:

How you'd do it depends on the UI structure. E.g., if you have a listview where each item is a task, you can add a progress bar as a child window for each listview item:

虽然这可能是一个矫kill过正,但通常只需要一个进度条来跟踪总体进度就足够了.也就是说,如果您有100个任务,则所有100个任务都完成后,进度条将显示100%.

This may be an overkill though, usually it's enough to have a single progress bar to track the overall progress. I.e., if you have a 100 tasks, your progress bar will show 100% when all 100 tasks have completed.

请注意,尽管#50任务(例如)可能在#45任务之前完成,所以您不能使用该任务的编号来更新进度.为了正确显示进度,您需要对已完成的任务进行计数,并使用该计数器作为进度指示器.

Note although, that task #50 (e.g.) may become completed before the task #45, so you can't use the task's number to update the progress. To show the progress correctly, you'd count completed tasks, and use that counter as the progress indicator.

已更新以发表评论:

我在列表视图中有4个进度条和500个任务.在任何给定时间由于限制,只有4个任务可以同时运行调度程序.我尝试在免费的列表视图中分配进度条状态到新的传入任务,然后设置状态任务完成后进度条释放,以便进度条可以被另一个新的传入任务重用.那有意义吗?还是我快要死了吗?

I have 4 progressbars in the listview and 500 tasks. At any given time there are only 4 tasks running concurrently due to the limited scheduler. I try to assign a progress bar in the listview with free status to a new incoming task and then set the status of the progressbar to free when the task is done so that the progressbar can be reused by another new incoming task. Does that make sense? Or am I going to a deadend?

我不确定这是否是明智的UI设计决定,但是如果您愿意,可以使用 SemaphoreSlim 来分配进度条作为有限的资源.示例:

I'm not sure it is a sensible UI design decision, but if you want it that way, you can use SemaphoreSlim to allocate a progress bar as a limited resource. Example:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace TaskProgress
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private async void Form1_Load(object sender, EventArgs e)
        {
            await DoWorkAsync();
        }

        const int MAX_PARALLEL = 4;

        readonly object _lock = new Object();

        readonly SemaphoreSlim _semaphore = 
            new SemaphoreSlim(initialCount: MAX_PARALLEL);

        HashSet<Task> _pendingTasks;

        Queue<ProgressBar> _availableProgressBars;

        // do all Task
        async Task DoWorkAsync()
        {
            _availableProgressBars = new Queue<ProgressBar>();
            _pendingTasks = new HashSet<Task>();

            var progressBars = new ProgressBar[] { 
                this.progressBar1, 
                this.progressBar2, 
                this.progressBar3, 
                this.progressBar4 };

            foreach (var item in progressBars)
                _availableProgressBars.Enqueue(item);

            for (int i = 0; i < 50; i++) // start 50 tasks
                QueueTaskAsync(DoTaskAsync());

            await Task.WhenAll(WithLock(() => _pendingTasks.ToArray()));
        }

        // do a sigle Task
        readonly Random _random = new Random(Environment.TickCount);

        async Task DoTaskAsync()
        {
            await _semaphore.WaitAsync();
            try
            {
                var progressBar = WithLock(() => _availableProgressBars.Dequeue());
                try
                {
                    progressBar.Maximum = 100;
                    progressBar.Value = 0;
                    IProgress<int> progress = 
                        new Progress<int>(value => progressBar.Value = value);

                    await Task.Run(() =>
                    {
                        // our simulated work takes no more than 10s
                        var sleepMs = _random.Next(10000) / 100;
                        for (int i = 0; i < 100; i++)
                        {
                            Thread.Sleep(sleepMs); // simulate work item
                            progress.Report(i);
                        }
                    });
                }
                finally
                {
                    WithLock(() => _availableProgressBars.Enqueue(progressBar));
                }
            }
            finally
            {
                _semaphore.Release();
            }
        }

        // Add/remove a task to the list of pending tasks
        async void QueueTaskAsync(Task task)
        {
            WithLock(() => _pendingTasks.Add(task));
            try
            {
                await task;
            }
            catch
            {
                if (!task.IsCanceled && !task.IsFaulted)
                    throw;
                return;
            }
            WithLock(() => _pendingTasks.Remove(task));
        }

        // execute func inside a lock
        T WithLock<T>(Func<T> func)
        {
            lock (_lock)
                return func();
        }

        // execute action inside a lock
        void WithLock(Action action)
        {
            lock (_lock)
                action();
        }

    }
}

此代码不使用自定义任务计划程序, SemaphoreSlim 足以限制并发性.还要注意,这里的保护性锁( WithLock )是多余的,因为除 Task.Run lambda之外的所有东西都运行on UI线程.但是,我决定保留锁,因为您的应用程序可能具有不同的线程模型.在这种情况下,无论何时访问 ProgressBar 或其他UI,都应使用 BeginInvoke 或在UI线程上执行此操作.

This code doesn't use a custom task scheduler, SemaphoreSlim is enough to limit the concurrency. Note also that protective locks (WithLock) are redundant here, because everything besides the Task.Run lambda runs the on UI thread. However, I decided to keep the locks as your application may have a different threading model. In this case, wherever you access ProgressBar or other UI, you should use BeginInvoke or like to do it on the UI thread.

此外,请检查,以获取有关 Progress< T> 模式的更多详细信息.

Also, check "Async in 4.5: Enabling Progress and Cancellation in Async APIs" for more details about Progress<T> pattern.

这篇关于如何从同时运行的任务更新进度条的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆