编码/设计通用线程安全限制器(即,每秒将X()的执行次数限制为Y次) [英] Coding/Designing a generic thread-safe limiter (i.e. limit the execution of X() to Y many times per second)

查看:88
本文介绍了编码/设计通用线程安全限制器(即,每秒将X()的执行次数限制为Y次)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正计划设计一个类,以将函数的执行在指定时间内限制为给定的数量,例如:

I'm planning to design a class to limit the execution of a function to given amount within the specified time, for example:

  • 最大处理1秒内5个文件

它应该是线程安全的,并且对性能的影响应该最小.

It should be thread-safe and performance hit should be minimal.

您将如何设计此类课程?我有几个主意,但是对我来说,没有一个是对的.

How would you design such a class? I've got couple of ideas but none of them seemed right to me.

此任务是否有已知的设计模式? (我正在编码.NET,但任何语言都可以)

Is there any known design pattern for such a task? (I'm coding .NET but any language is OK)

外部类应该像这样工作(假设它是单例的):

From the outside class should work like this (assuming it's singleton):

设置:

Limiter.Instance.MaxExecutionPerSecond = 5

然后,我们将在执行函数之前在线程内调用此函数,如果需要,它将阻塞线程:

Then we would call this within threads before executing our function and if it's required it'd block the thread:

Limiter.Instance.WaitIfRequired()

推荐答案

像这样吗?

using Timer = System.Threading.Timer;

class Limiter{
    public static readonly Limiter Instance = new Limiter();

    Limiter(){}

    int         max;
    Semaphore   counter;
    List<Timer> timers  = new List<Timer>();

    // warning: not thread safe!
    public int MaxExecutionPerSecond{
        get{return max;}
        set{counter = new Semaphore(max = value, value);}
    }

    public void WaitIfRequired(){
        // Our semaphore starts with a count of MaxExecutionPerSecond.
        // When we call WaitOne(), it decrements the count.  If the count
        // is already zero, the call to WaitOne() will block until another
        // thread calls Release() to increment the count.
        counter.WaitOne();

        // Set a timer to increment the semaphore in one second.
        Timer t = null;
        t = new Timer(o=>{
            // Increment the semaphore.
            counter.Release();

            // We no longer need to protect this timer from the GC.
            timers.Remove(t);
            t.Dispose();
        });

        // Hold a reference to this timer to keep it from being disposed of.
        timers.Add(t);

        // Set the timer to release the semaphore in one second.
        t.Change(1000, Timeout.Infinite);
    }
}


编辑

要记住的一件事是,上面的代码只会阻止许多线程一次启动.如果线程长时间运行,那么仍然有可能同时有多个线程正在运行.例如,如果您每秒启动5个线程,但每个线程运行1秒,那么在2秒后的任何给定时间,理论上您将有10个线程在运行.

One thing to keep in mind is that the above code will only prevent many threads starting at once. If the threads are long running, then it's still possible to have many threads running at once. For instance, if you start 5 threads per second, but each thread runs for 1 second, then at any given time after 2 seconds, you'll theoretically have 10 threads running.

如果要确保一次运行的线程数量不超过5个,最简单的方法是省去自定义的Limiter类,而直接使用Semaphore.

If you want to make sure you never have more than 5 threads running at once, the simplest thing to do is to dispense with the custom Limiter class, and just use a Semaphore directly.

const int maxThreadCount = 5;
static Semaphore counter = new Semaphore(maxThreadCount, maxThreadCount);

static void NewThread(object state){
    counter.WaitOne();

    // do something

    counter.Release();
}

现在,这非常简单.需要注意的是:创建新线程并立即将其休眠通常被认为是一个坏主意.这使用系统资源来创建不执行任何操作的线程.最好将请求排队并启动新线程(或者更好的是使用线程池线程)以仅在有资格运行它们时处理它们.这更复杂,更难解决.根据设计,它可能需要用于调度程序/管理的附加线程.像这样:

Now that's just about as simple as it can be. One caveat though: creating new threads and immediately sleeping them is generally considered a bad idea. This uses system resources to create a thread that's not doing anything. It's better to queue up requests and start up new threads (or better yet, use thread pool threads) to handle them only when they are eligible to run. This is more complicated and harder to get right. Depending on design, it might require an additional thread for a scheduler/manage. Something like this:

class Limiter{
    class WorkData{
        readonly ParameterizedThreadStart action;
        readonly object                   data;

        public ParameterizedThreadStart Action{get{return action;}}
        public object                   Data  {get{return data;}}

        public WorkData(ParameterizedThreadStart action, object data){
            this.action = action;
            this.data   = data;
        }
    }

    readonly Semaphore       threadCount;
    readonly Queue<WorkData> workQueue   = new Queue<WorkData>();
    readonly Semaphore       queueCount  = new Semaphore(0, int.MaxValue);

    public Limiter(int maxThreadCount){
        threadCount = new Semaphore(maxThreadCount, maxThreadCount);
        Thread t = new Thread(StartWorkItems);
        t.IsBackground = true;
        t.Start();
    }

    void StartWorkItems(object ignored){
        while(queueCount.WaitOne() && threadCount.WaitOne()){
            WorkData wd;
            lock(workQueue)
                wd = workQueue.Dequeue();

            ThreadPool.QueueUserWorkItem(DoWork, wd);
        }
    }
    void DoWork(object state){
        WorkData wd = (WorkData)state;
        wd.Action(wd.Data);
        counter.Release();
    }

    public void QueueWork(ParameterizedThreadStart action, object data){
        lock(workQueue)
            workQueue.Enqueue(new WorkData(action, data));
        queueCount.Release();
    }
}

在此类中,我删除了singleton属性,并为构造函数指定了maxThreadCount参数.这避免了在第一类的属性中缺少线程安全性.还有其他方法可以添加线程安全性,但这是最简单的.

In this class, I've removed the singleton property and given the constructor a maxThreadCount parameter. This avoids the lack of the thread safety in the property of the first class. There are other ways thread safety can be added, but this was the simplest.

这篇关于编码/设计通用线程安全限制器(即,每秒将X()的执行次数限制为Y次)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆