具有有限队列的线程池 [英] Thread pool with bounded queue

查看:89
本文介绍了具有有限队列的线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经看到了线程池执行器的实现以及它提供的被拒绝的执行策略.但是,我有一个自定义要求-我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说当队列大小减少到最大允许队列大小的80%时发出通知.

I have seen the thread pool executor implementation and the rejected execution policies that it provides. However, I have a custom requirement - I want to have a call back mechanism where in I get notifications when the queue size limit is reached and say when the queue size reduces to say 80 % of the max allowed queue size.

public interface ISaturatedPoolObserver {
 void onSaturated(); // called when the blocking queue reaches the size limit
 void onUnsaturated(); // called when blocking queues size goes below the threshold.
}

我认为可以通过子类化线程池执行程序来实现,但是已经实现了吗?我很乐意在需要时提供更多细节和我的工作,以便提供清晰的信息.

I feel that this can be implemented by subclassing thread pool executor, but is there an already implemented version? I would be happy to add more details and my work so far as and when needed to provide clarity.

推荐答案

我想要一种回调机制,当达到队列大小限制时,我会收到通知...

I want to have a call back mechanism where in I get notifications when the queue size limit is reached...

我不会将执行器子类化,但会将执行器使用的BlockingQueue子类化.像下面这样的东西应该起作用.如果删除条目并将某人放回去,则checkUnsaturated()周围的代码中存在竞争条件.如果这些条件需要完善,则可能必须在队列上进行同步.另外,我也不知道执行器实现使用什么方法,因此您可能不需要覆盖其中的某些方法.

I wouldn't subclass the executor but I would subclass the BlockingQueue that is used by the executor. Something like the following should work. There are race conditions in the code around the checkUnsaturated() if you remove an entry and someone puts one back in. You might have to synchronize on the queue if these need to be perfect. Also, I have no idea what methods the executor implementations use so you might not need to override some of these.

public class ObservableBlockingQueue<E> extends LinkedBlockingQueue<E> {
     private ISaturatedPoolObserver observer;
     private int capacity;
     public ObservableBlockingQueue(ISaturatedPoolObserver observer,
         int capacity) {
         super(capacity);
         this.observer = observer;
         this.capacity = capacity;
    }
    @Override
    public boolean offer(E o) {
        boolean offered = super.offer(o);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
        boolean offered = super.offer(o, timeout, unit);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public E poll() {
        E e = super.poll();
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = super.poll(timeout, unit);
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E take() throws InterruptedException {
        E e = super.take();
        checkUnsaturated();
        return e;
    }
    @Override
    public boolean remove(E e) throws InterruptedException {
        boolean removed = super.remove(e);
        if (removed) {
            checkUnsaturated();
        }
        return removed;
    }
    private void checkUnsaturated() {
        if (super.size() * 100 / capacity < UNSATURATED_PERCENTAGE) {
            observer.onUnsaturated();
        }
    }
}

这篇关于具有有限队列的线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆