如何经过长期运行的任务被取消正确清理 [英] How to correctly clean up after long running task is cancelled

查看:186
本文介绍了如何经过长期运行的任务被取消正确清理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个类,其目的是抽象掉的一个队列的并发访问控制。



类被设计在单个线程中实例化通过多线程写入,然后从随后的单个线程读取。



我有阶级内部产生一个长期运行的任务将执行阻塞循环并触发一个。事件如果项目成功出列



我的问题是:是我执行长时间运行的任务,随后清理/重置正确用法<的取消的code> CancellationTokenSource 对象?



在理想情况下,我想一个活动对象才能够被停止,同时保持可用性重新启动要添加到队列



我用彼得布朗伯格的文章为基础:的生产者/消费者队列和BlockingCollection在C#4.0



下面的代码:

 使用系统; 
使用System.Collections.Concurrent;
使用的System.Threading;使用System.Threading.Tasks
;

命名空间测试
{
公众委托无效DeliverNextQueuedItemHandler< T>(T项目);

公共密封类SOQueueManagerT< T>
{

ConcurrentQueue< T> _multiQueue;
BlockingCollection< T> _队列;
CancellationTokenSource _canceller;
任务_listener = NULL;

公共事件DeliverNextQueuedItemHandler< T> OnNextItem;

公共BOOL IsRunning {搞定;私人集; }
公众诠释QUEUESIZE
{
得到
{
如果(_queue!= NULL)
返回_queue.Count;
返回-1;
}
}

公共Can​​cellationTokenSource CancellationTokenSource
{
得到
{
如果(_canceller == NULL)
_canceller =新CancellationTokenSource();

返回_canceller;
}
}

公共SOQueueManagerT()
{
_multiQueue =新ConcurrentQueue< T>();
_queue =新BlockingCollection< T>(_多队列);

IsRunning = FALSE;
}

公共无效启动()
{
如果(_listener == NULL)
{


IsRunning = TRUE;

_listener = Task.Factory.StartNew(()=>
{

,而(CancellationTokenSource.Token.IsCancellationRequested)
{$ B! $(b T)项目;
如果(_queue.TryTake(出项目,100))
{
如果(OnNextItem!= NULL)
{

OnNextItem(项目);
}
}

}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}

公共无效停止()
{
如果(_listener!= NULL)
{
CancellationTokenSource。取消();
清理();
}
}

公共无效添加(T项目)
{
_queue.Add(项目);
}

私人无效清理()
{
_listener.Wait(2000年);
如果(_listener.IsCompleted)
{
IsRunning = FALSE;
_listener = NULL;
_canceller = NULL;
}
}


}
}






更新
这里是我已经跟到底。它不是完美的,但到目前为止在做的工作。



 公共密封类TaskQueueManager< T> 
{
ConcurrentQueue< T> _multiQueue;
BlockingCollection< T> _队列;
CancellationTokenSource _canceller;
任务_listener = NULL;

公共事件DeliverNextQueuedItemHandler< T> OnNextItem;

公共BOOL IsRunning
{
得到
{
如果(_listener == NULL)
返回FALSE;
,否则如果(_listener.Status == || TaskStatus.Running
_listener.Status == || TaskStatus.Created
_listener.Status == || TaskStatus.WaitingForActivati​​on
_listener .STATUS == || TaskStatus.WaitingToRun
_listener.IsCanceled)
返回真;
,否则
返回FALSE;
}
}
公众诠释QUEUESIZE
{
得到
{
如果(_queue!= NULL)
返回_queue。计数;
返回-1;
}
}

公共TaskQueueManager()
{
_multiQueue =新ConcurrentQueue< T>();
_queue =新BlockingCollection< T>(_多队列);
}

公共无效启动()
{
如果(_listener == NULL)
{
_canceller =新CancellationTokenSource();

_listener = Task.Factory.StartNew(()=>!
{
,而(_canceller.Token.IsCancellationRequested)
{$ B $(B T)项目;
如果(_queue.TryTake(出项目,100))
{
如果(OnNextItem!= NULL)
{

{
OnNextItem(项目);
}
赶上(例外五)
{
//登录,或致电事件
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}

公共无效停止()
{
如果(_listener!= NULL)
{
_canceller。取消();

如果(_listener.IsCanceled&安培;&安培;!_listener.IsCompleted)
_listener.Wait();

_listener = NULL;
_canceller = NULL;
}
}

公共无效添加(T项目)
{
如果(项目!= NULL)
{
_queue.Add(项目);
}
,否则
{
抛出新的ArgumentNullException(TaskQueueManager<+ typeof运算(T),请将.Name +>。新增项目为空);
}
}
}


解决方案

仔细编程是这会削减它的唯一的事情。即使你取消操作你可能有,这不是在一段时尚量完成挂起操作。这很可能是已陷入僵局阻塞操作。在这种情况下,你的程序实际上不会终止。



举例来说,如果我打电话给你的清除方法几倍,而​​无需调用开始首先,我越来越觉得它会崩溃。



清理期间A 2为秒Timeout,感觉比计划更随意,其实我去尽可能确保事情正常关机或死机/挂机(你永远都不想离开并发的东西,在一个未知状态)。



此外,在 IsRunning 明确设置,而不是从对象的状态推断。



有关的灵感,我想你看看相似的类我最近写了,那就是做它在后台线程工作,生产者/消费者模式。你可以找到 CodePlex上的源代码。虽然,这是设计来解决一个非常具体的问题。



下面,取消由enquing,只有工作线程识别,从而开始关闭特定类型的解决。这也保证了我从来没有取消未完成的工作,只是整个工作的单位被考虑。



要改善这种状况有点,你可以有当前工作的独立计时器并中止或者如果它取消回滚未完成的工作。现在,实施的交易的行为一样会因为你需要看看每一个可能的角落情况,并问自己采取一些试验和错误,如果程序崩溃这里会发生什么?理想的情况是所有这些代码路径因此导致从中可以恢复你的工作可恢复或已知的状态。但是当我想你已经猜到了,这是要采取谨慎编程和大量的测试。


I've created a class whose purpose is to abstract away the control of concurrent access to a queue.

The class is designed to be instantiated on a single thread, written to by multiple threads and then read from a subsequent single thread.

I have a single long running task generated inside the class which will perform a blocking loop and fire an event if an item is successfully dequeued.

My question is this: Is my implementation of the cancelling of the long running task AND subsequent clean up/reset correct usage of the CancellationTokenSource object?

Ideally, I'd like an active object to be able to be stopped and restarted while maintaining availability to add to the queue.

I've used Peter Bromberg's article as a basis: Producer/Consumer Queue and BlockingCollection in C# 4.0

Code Below:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}


UPDATE Here's what I have gone with in the end. It isn't perfect but so far is doing the job.

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}

解决方案

Careful programming is the only thing that's gonna cut it. Even if you cancel the operation you might have a pending operation that's not completing in a fashionable amount of time. It could very well be a blocking operation that's deadlocked. In this case your program will not actually terminate.

For instance, if I call your CleanUp method several times or without calling Start first I'm getting the feeling it's going to crash.

A 2 seconds timeout during cleanup, feels more arbitrary than planned, and I'd actually go as far as to ensure that things shutdown properly or crash/hang (you never want to leave concurrent stuff in an unknown state).

Also, the IsRunning is explicitly set, not inferred from the state of the object.

For inspiration I'd like you to look at a similar class I wrote recently, it's a producer/consumer pattern that does it's work in a background thread. You can find that source code on CodePlex. Though, this was engineered to solve a very specific problem.

Here, cancellation is solved by enquing a specific type that only the worker thread recognizes and thus begins shutting down. This also ensures that I never cancel pending work, only whole units of work are considered.

To improve this situation a bit you can have a separate timer for current work and abort or rollback incomplete work if it's canceled. Now, implementing a transaction like behavior is going to take some trial and error because you need to look at every possible corner case and ask yourself, what happens if the program crashes here? Ideally all these code paths so lead to a recoverable or known state from which you can resume your work. But as I think you've guessed already, that's going to take careful programming and a lot of testing.

这篇关于如何经过长期运行的任务被取消正确清理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆