正确的方法来实现资源池 [英] Correct way to implement a resource pool

查看:140
本文介绍了正确的方法来实现资源池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现一种管理资源池的方法,以便调用代码可以请求一个对象,如果有的话,将从池中提供一个对象,否则将使其等待.但是,我无法使同步正常工作.我的池类中的内容是这样的(其中autoEvent是最初设置为用信号表示的AutoResetEvent:

I'm trying to implement something that manages a pool of resources such that the calling code can request an object and will be given one from the pool if it's available, or else it will be made to wait. I'm having trouble getting the synchronization to work correctly however. What I have in my pool class is something like this (where autoEvent is an AutoResetEvent initially set as signaled:

public Foo GetFooFromPool()
{
    autoEvent.WaitOne();
    var foo = Pool.FirstOrDefault(p => !p.InUse);
    if (foo != null)
    {
        foo.InUse = true;
        autoEvent.Set();
        return foo;
    }
    else if (Pool.Count < Capacity)
    {
        System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
        foo = new Foo() { InUse = true };
        Pool.Add(foo);
        autoEvent.Set();
        return foo;
    }
    else
    {
        return GetFooFromPool();
    }
}

public void ReleaseFoo(Foo p)
{
    p.InUse = false;
    autoEvent.Set();
}

这个想法是,当您呼叫GetFooFromPool时,您会等到收到信号后,再尝试查找未使用的现有Foo.如果找到一个,我们将其设置为InUse,然后发出一个信号,以便其他线程可以继续.如果找不到一个,则检查池是否已满.如果不是,我们创建一个新的Foo,将其添加到池中并再次发出信号.如果这两个条件都不满足,请再次调用GetFooFromPool让我们再次等待.

The idea is when you call GetFooFromPool, you wait until signaled, then you try and find an existing Foo that is not in use. If you find one, we set it to InUse and then fire a signal so other threads can proceed. If we don't find one, we check to see if the the pool is full. If not, we create a new Foo, add it to the pool and signal again. If neither of those conditions are satisfied, we are made to wait again by calling GetFooFromPool again.

现在在ReleaseFoo中,我们只是将InUse设置为false,并发信号通知在GetFooFromPool中等待下一个线程(如果有)尝试获取Foo.

Now in ReleaseFoo we just set InUse back to false, and signal the next thread waiting in GetFooFromPool (if any) to try and get a Foo.

问题似乎出在我管理池的大小上.容量为5,我最后得到的是6 Foo s.我可以在调试行中看到count 0出现了几次,而count 1也可能出现了几次.很明显,据我所知,我有多个线程无法进入它们.

The problem seems to be in my managing the size of the pool. With a capacity of 5, I'm ending up with 6 Foos. I can see in my debug line count 0 appear a couple of times and count 1 might appear a couple of times also. So clearly I have multiple threads getting into the block when, as far as I can see, they shouldn't be able to.

我在做什么错了?

像这样的双重检查锁:

else if (Pool.Count < Capacity)
{
    lock(locker)
    {
        if (Pool.Count < Capacity)
        {
            System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
            foo = new Foo() { InUse = true };
            Pool.Add(foo);
            autoEvent.Set();
            return foo;
        }
    }
} 

似乎可以解决问题,但是我不确定这是解决问题的最优雅的方法.

Does seem to fix the problem, but I'm not sure it's the most elegant way to do it.

推荐答案

正如评论中已经提到的,计数信号量是您的朋友. 将此与并发堆栈结合起来,您将获得一个很好的简单,线程安全的实现,您仍然可以在这里懒惰地分配池项.

As was already mentioned in the comments, a counting semaphore is your friend. Combine this with a concurrent stack and you have got a nice simple, thread safe implementation, where you can still lazily allocate your pool items.

下面的准系统实现提供了这种方法的示例.请注意,这里的另一个优点是您不需要使用InUse成员作为标记来跟踪事物的污染"池项目.

The bare-bones implementation below provides an example of this approach. Note that another advantage here is that you do not need to "contaminate" your pool items with an InUse member as a flag to track stuff.

请注意,作为微优化,在这种情况下,栈优先于队列,因为它会提供池中最近返回的实例,例如L1缓存.

Note that as a micro-optimization, a stack is preferred over a queue in this case, because it will provide the most recently returned instance from the pool, that may still be in e.g. L1 cache.

public class GenericConcurrentPool<T> : IDisposable where T : class
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentStack<T> _itemsStack;
    private readonly Action<T> _onDisposeItem;
    private readonly Func<T> _factory;

    public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
    {
        _itemsStack = new ConcurrentStack<T>(new T[capacity]);
        _factory = factory;
        _onDisposeItem = onDisposeItem;
        _sem = new SemaphoreSlim(capacity);
    }

    public async Task<T> CheckOutAsync()
    {
        await _sem.WaitAsync();
        return Pop();
    }

    public T CheckOut()
    {
        _sem.Wait();
        return Pop();
    }

    public void CheckIn(T item)
    {
        Push(item);
        _sem.Release();
    }

    public void Dispose()
    {
        _sem.Dispose();
        if (_onDisposeItem != null)
        {
            T item;
            while (_itemsStack.TryPop(out item))
            {
                if (item != null)
                    _onDisposeItem(item);
            }
        }
    }

    private T Pop()
    {
        T item;
        var result = _itemsStack.TryPop(out item);
        Debug.Assert(result);
        return item ?? _factory();
    }

    private void Push(T item)
    {
        Debug.Assert(item != null);
        _itemsStack.Push(item);
    }
}

这篇关于正确的方法来实现资源池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆