确保线程池中的任务执行顺序 [英] Ensuring task execution order in threadpool

查看:1569
本文介绍了确保线程池中的任务执行顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在阅读关于线程池模式,我似乎找不到通常的解决方案为以下问题。



我有时想要任务串行执行。例如,我从文件读取文本块,并且由于某种原因,我需要按照该顺序处理块。所以基本上我想消除并发某些任务



考虑这种情况下,任务与 * 需要按推入顺序处理。其他任务可以按任何顺序处理。

  push task1 
push task2
push task3 *
push task4 *
push task5
push task6 *
....
所以在

在没有这个约束的线程池的上下文中,工作正常,但很明显在这里它不是。



我想到有一些线程操作在线程特定的队列,其他在全局队列。然后,为了连续执行一些任务,我只需要将它们推送到单个线程看起来的队列。 听起来有点笨拙。



因此,这个故事中真正的问题: 您将如何确保订购这些任务



EDIT



一般问题,假设上面的情况成为

  push task1 
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
等等

我的意思是组内的任务应该顺序执行,但是团体本身可以混合。因此,您可以拥有 3-2-5-4-7



我无法访问组中的所有任务(我无法等到所有的任务在启动组之前到达)。



谢谢

解决方案

类似下面的内容将允许串行和并行任务排队,其中串行任务将被执行一个接一个,并行任务将以任何顺序执行,但并行执行。这使您能够在必要时对任务进行序列化,还具有并行任务,但是在接收任务时这样做,即,您不需要先前知道整个序列,动态地保持执行顺序。

  internal class TaskQueue 
{
private readonly object _syncObj = new object();
private readonly Queue< QTask> _tasks = new Queue< QTask>();
private int _runningTaskCount;

public void Queue(bool isParallel,Action task)
{
lock(_syncObj)
{
_tasks.Enqueue(new QTask {IsParallel = isParallel ,Task = task});
}

ProcessTaskQueue();
}

public int Count
{
get {lock(_syncObj){return _tasks.Count;}}
}

private void ProcessTaskQueue()
{
lock(_syncObj)
{
if(_runningTaskCount!= 0)return;

while(_tasks.Count> 0&& _tasks.Peek()。IsParallel)
{
QTask parallelTask​​ = _tasks.Dequeue

QueueUserWorkItem(parallelTask​​);
}

if(_tasks.Count> 0&& _runningTaskCount == 0)
{
QTask serialTask​​ = _tasks.Dequeue

QueueUserWorkItem(serialTask​​);
}
}
}

private void QueueUserWorkItem(QTask qTask)
{
Action completionTask =()=>
{
qTask.Task();

OnTaskCompleted()
};

_runningTaskCount ++;

ThreadPool.QueueUserWorkItem(_ => completionTask());
}

private void OnTaskCompleted()
{
lock(_syncObj)
{
if(--_ runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}

私人类QTask
{
public Action Task {get;组; }
public bool IsParallel {get;组; }
}
}

更新 p>

要处理具有串行和并行任务混合的任务组, GroupedTaskQueue 可以管理 TaskQueue 。再次,您不需要预先了解组,它是所有动态管理作为任务接收。

 内部类GroupedTaskQueue 
{
private readonly object _syncObj = new object();
private readonly字典< string,TaskQueue> _queues = new Dictionary< string,TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid()。ToString();

public void Queue(bool isParallel,Action task)
{
Queue(_defaultGroup,isParallel,task);
}

public void Queue(string group,bool isParallel,Action task)
{
TaskQueue queue;

lock(_syncObj)
{
if(!_queues.TryGetValue(group,out queue))
{
queue = new TaskQueue

_queues.Add(group,queue);
}
}

操作completionTask =()=>
{
task();

OnTaskCompleted(group,queue);
};

queue.Queue(isParallel,completionTask);
}

private void OnTaskCompleted(string group,TaskQueue queue)
{
lock(_syncObj)
{
if == 0)
{
_queues.Remove(group);
}
}
}
}


I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

Consider this scenario where the tasks with * need to be processed in the order they were pushed in. The other tasks can be processed in any order.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.

So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

EDIT

As a more general problem, suppose the scenario above becomes

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 for example.

One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).

Thank you for your time.

解决方案

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Update

To handle task groups with serial and parallel task mixes, a GroupedTaskQueue can manage a TaskQueue for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

这篇关于确保线程池中的任务执行顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆