将线程池队列定时器的回调函数,有时同时调度多个线程? [英] Will the threadpool queue a timer's callback function, sometimes scheduling more than one thread at the same time?

查看:507
本文介绍了将线程池队列定时器的回调函数,有时同时调度多个线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在下面的代码 TimerRecalcStatisticsElapsed 应该只有一个它的实例运行。此回调调用的工作方法按顺序运行,每次最多运行一个线程



问题第1部分:



如果计时器的回调运行了一个线程池线程而不是在单独的线程上运行回调),是正确的说,线程池可能排队和延迟线程基于条件(MaxThreads达到,线程池内部逻辑)稍后执行?



问题第2部分



假设一个定时器回调可以排队等待立即执行,那么是否意味着任何数量的线程回调可以同时执行?



第3部分



假设第2部分是真的,这是否意味着下面的代码可以同时有多个回调操作?



我要求的原因是因为在多CPU服务器上运行此类的几千个实例。我也看到数据损坏与 // Do Work Here 的乱序操作一致。



旁白



//在这里工作在内部使用System.Collections.Dictionary,并编辑y的值。它还删除了串行调用的后续函数的一些键。该函数缺少之前在第一次调用中出现的键(x)。我认为这是因为有一个竞争条件与最终语句 obj.cleanupdata()

  public class SystemTimerTest 
{

readonly System.Timers.Timer timerRecalcStatistics;
readonly System.Diagnostics.Stopwatch stopwatchForRecalcStatistics = new System.Diagnostics.Stopwatch();


public SystemTimerTest(TimeSpan range,DataOverwriteAction action)
{
int recalculateStatisticsEveryXMillseconds = 1000;

timerRecalcStatistics = new System.Timers.Timer(recalculateStatisticsEveryXMillseconds);
timerRecalcStatistics.AutoReset = true;
timerRecalcStatistics.Elapsed + = new System.Timers.ElapsedEventHandler(TimerRecalcStatisticsElapsed);
timerRecalcStatistics.Interval = recalculateStatisticsEveryXMillseconds;
timerRecalcStatistics.Enabled = true;


this.maxRange = range;
this.hashRunningTotalDB = new HashRunningTotalDB(action);
this.hashesByDate = new HashesByDate(action);
this.dataOverwriteAction = action;
}


private void TimerRecalcStatisticsElapsed(object source,System.Timers.ElapsedEventArgs e)
{
stopwatchForRecalcStatistics.Start();
Console.WriteLine(TimerRecalcStatisticsElapsed事件在{0}引发,e.SignalTime.ToString(o));

//在这里工作


stopwatchForRecalcStatistics.Stop();
double timeBuffer = GetInterval(IntervalTypeEnum.NearestSecond,e.SignalTime) - stopwatchForRecalcStatistics.ElapsedMilliseconds;

if(timeBuffer> 0)
timerRecalcStatistics.Interval = timeBuffer;
else
timerRecalcStatistics.Interval = 1;

stopwatchForRecalcStatistics.Reset();
timerRecalcStatistics.Enabled = true;
}
}


解决方案

1)ThreadPool是否可以延迟回调方法的执行并不重要,因为无论如何,回调不能保证在另一个定时器间隔过去之前完成执行(线程可以被线程调度器挂起,或者回调可能调用长时间运行函数)。



ad 2)这是MSDN对 Timer 类:


如果SynchronizingObject属性为null,事件是在ThreadPool线程上引发的
。如果处理Elapsed事件
的持续时间超过Interval,则事件可能在另一个
ThreadPool线程上再次引发。在这种情况下,事件处理程序应该是
reentrant。


所以答案是YES,回调可以在多个



ad 3)YES。您应该避免在回调方法中使用共享资源(timerRecalcStatistics,stopwatchForRecalcStatistics),或者同步对这些共享资源的访问(例如使用lock),或者将适当的对象设置为Timer的 SynchronizingObject 属性,或将Timer的AutoReset属性设置为false(并在定时器回调结束时再次启用定时器)。

UPDATE
我的事情Jon Skeet的 answer 无法解决您的问题。另外实现你自己的SynchonizingObject是IMHO比必要的更复杂(但很难说,不知道整个问题)。我希望这个实现应该工作(但我没有测试它):

  public class MySynchronizeInvoke:ISynchronizeInvoke 
{
private object SyncObject = new Object();
私人委托对象InvokeDelegate(Delegate method,object [] args);

public IAsyncResult BeginInvoke(Delegate method,object [] args)
{
ElapsedEventHandler handler =(ElapsedEventHandler)method;
InvokeDelegate D = Invoke;
return D.BeginInvoke(handler,args,CallbackMethod,null);
}

private void CallbackMethod(IAsyncResult ar)
{
AsyncResult result = ar as AsyncResult;
if(result!= null)
((InvokeDelegate)result.AsyncDelegate).EndInvoke(ar);
}

public object EndInvoke(IAsyncResult result)
{
result.AsyncWaitHandle.WaitOne();
return null;
}

public object Invoke(Delegate method,object [] args)
{
lock(SyncObject)
{
ElapsedEventHandler handler = (ElapsedEventHandler)方法;
handler(args [0],(ElapsedEventArgs)args [1]);
return null;
}
}

public bool InvokeRequired
{
get {return true; }
}
}


In the following code TimerRecalcStatisticsElapsed should only have one instance of it running. The worker methods that this callback invokes is made to run in sequence, with a maximum of one thread running at a time.

Question Part 1:

If the timer's callback runs an a threadpool thread (as opposed to running the callback on a separate thread), is it correct to say the the threadpool might queue and defer the thread for later execution based on conditions (MaxThreads reached, threadpool internal logic)?

Question Part 2:

Assuming it's possible for one timer callback to be queued for anything but immediate execution, does that mean that any number of thread callbacks may execute concurrently?

Question Part 3

Assuming part 2 is true, does that mean the code below can ever have more than one callback operating at the same time?

The reason I'm asking is because there are several thousand instances of this class running on a multi-CPU server. I'm also seeing data corruption consistent with an out-of-order operation of // Do Work Here.

Aside

// Do work here internally works with a System.Collections.Dictionary and edits the values of y. It also removes some keys for a subsequent function that is called serially. That function is missing keys (x) that were previously present in the first call. I think this is because there is a race condition with the final statement obj.cleanupdata()

public class SystemTimerTest
   {

    readonly System.Timers.Timer timerRecalcStatistics;
    readonly System.Diagnostics.Stopwatch stopwatchForRecalcStatistics = new System.Diagnostics.Stopwatch();


    public SystemTimerTest(TimeSpan range, DataOverwriteAction action)
    {
        int recalculateStatisticsEveryXMillseconds = 1000;

        timerRecalcStatistics = new System.Timers.Timer(recalculateStatisticsEveryXMillseconds);
        timerRecalcStatistics.AutoReset = true;
        timerRecalcStatistics.Elapsed += new System.Timers.ElapsedEventHandler(TimerRecalcStatisticsElapsed);
        timerRecalcStatistics.Interval = recalculateStatisticsEveryXMillseconds;
        timerRecalcStatistics.Enabled = true;


        this.maxRange = range;
        this.hashRunningTotalDB = new HashRunningTotalDB(action);
        this.hashesByDate = new HashesByDate(action);
        this.dataOverwriteAction = action;
    }


    private void TimerRecalcStatisticsElapsed(object source, System.Timers.ElapsedEventArgs e)
    {
        stopwatchForRecalcStatistics.Start();
        Console.WriteLine("The TimerRecalcStatisticsElapsed event was raised at {0}", e.SignalTime.ToString("o"));

         // DO WORK HERE


        stopwatchForRecalcStatistics.Stop();
        double timeBuffer  = GetInterval(IntervalTypeEnum.NearestSecond, e.SignalTime) - stopwatchForRecalcStatistics.ElapsedMilliseconds;

        if (timeBuffer > 0)
            timerRecalcStatistics.Interval = timeBuffer;
        else
            timerRecalcStatistics.Interval = 1;

        stopwatchForRecalcStatistics.Reset();         
        timerRecalcStatistics.Enabled = true;
    }
 }

解决方案

ad 1) It is not important whether ThreadPool can defer execution of callback method, because anyway callback is not guaranteed to complete execution before another timer interval(s) elapses (thread can be suspended by thread scheduler for example, or callback might call long-running function).

ad 2) This is what MSDN says about Timer class:

If the SynchronizingObject property is null, the Elapsed event is raised on a ThreadPool thread. If processing of the Elapsed event lasts longer than Interval, the event might be raised again on another ThreadPool thread. In this situation, the event handler should be reentrant.

So the answer is YES, callback can be executing on multiple threads concurrently.

ad 3) YES. And you should either avoid using shared resources (timerRecalcStatistics, stopwatchForRecalcStatistics) in callback method, or synchronize access to these shared resources (for example with lock), or set appropriate object to Timer's SynchronizingObject property, or set AutoReset property of Timer to false (and enable timer again at the end of timer callback).

UPDATE: I thing that Jon Skeet's answer doesn't solve your problem. Also implementing your own SynchonizingObject is IMHO more complicated than necessary (but it's hard to say without knowing whole problem). I hope this implementation should work (but I didn't tested it):

public class MySynchronizeInvoke : ISynchronizeInvoke
{
    private object SyncObject = new Object();
    private delegate object InvokeDelegate(Delegate method, object[] args);

    public IAsyncResult BeginInvoke(Delegate method, object[] args)
    {
        ElapsedEventHandler handler = (ElapsedEventHandler)method;
        InvokeDelegate D = Invoke;
        return D.BeginInvoke(handler, args, CallbackMethod, null);
    }

    private void CallbackMethod(IAsyncResult ar)
    {
        AsyncResult result = ar as AsyncResult;
        if(result != null)
            ((InvokeDelegate)result.AsyncDelegate).EndInvoke(ar);
    }

    public object EndInvoke(IAsyncResult result)
    {
        result.AsyncWaitHandle.WaitOne();
        return null;
    }

    public object Invoke(Delegate method, object[] args)
    {
        lock(SyncObject)
        {
            ElapsedEventHandler handler = (ElapsedEventHandler)method;
            handler(args[0], (ElapsedEventArgs)args[1]);
            return null;
        }
    }

    public bool InvokeRequired
    {
        get { return true; }
    }
}

这篇关于将线程池队列定时器的回调函数,有时同时调度多个线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆