ThreadPoolExecutor和队列 [英] ThreadPoolExecutor and the queue

查看:115
本文介绍了ThreadPoolExecutor和队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我认为使用 ThreadPoolExecutor 我们可以提交 Runnable s在构造函数中传递的 BlockingQueue 中执行或使用执行方法。

另外我的理解是,如果任务可用,它将被执行。

我不明白的是以下:

I thought that using ThreadPoolExecutor we can submit Runnables to be executed either in the BlockingQueue passed in the constructor or using the execute method.
Also my understanding was that if a task is available it will be executed.
What I don't understand is the following:

public class MyThreadPoolExecutor {  

    private static ThreadPoolExecutor executor;  

    public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){  
        executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue);   
        //executor.prestartAllCoreThreads();  
    }  

    public static void main(String[] main){
        BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
        final String[] names = {"A","B","C","D","E","F"};  
        for(int i = 0; i < names.length; i++){  
            final int j = i;  
            q.add(new Runnable() {  

                @Override  
                public void run() {  
                    System.out.println("Hi "+ names[j]);  

                }  
            });         
        }  
        new MyThreadPoolExecutor(10, 20, 1, q);   
        try {  
            TimeUnit.SECONDS.sleep(5);  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        /*executor.execute(new Runnable() {  

            @Override  
            public void run() {  

                System.out.println("++++++++++++++");  

            }   
        });  */
        for(int i = 0; i < 100; i++){  
            final int j = i;  
            q.add(new Runnable() {   

                @Override  
                public void run() {  
                    System.out.println("Hi "+ j);  

                }  
            });  
        }   


    }  


}

除非我在构造函数中取消注释 executor.prestartAllCoreThreads(); ,否则此代码不会执行任何操作,或者我调用执行打印 System.out.println(++++++++++++++++++++++++++++ $ c>(它也被注释掉了)。

This code does not do absolutely anything unless I either uncomment the executor.prestartAllCoreThreads(); in the constructor OR I call execute of the runnable that prints System.out.println("++++++++++++++"); (it is also commented out).

为什么?

引用(我的重点):

Why?
Quote (my emphasis):


默认情况下,最初创建核心线程,并且在新任务到达时仅启动
,但这可以使用
方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。如果构造具有非空
队列的池,则可能
想要预启动线程。

By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread() or prestartAllCoreThreads(). You probably want to prestart threads if you construct the pool with a non-empty queue.

好的。所以我的队列不是空的。但我创建执行者,我做 sleep 然后我添加新 Runnable 到队列(在循环中为100)。

这个循环不计算为新任务到达

为什么它不起作用我必须预启动或明确地调用执行

Ok. So my queue is not empty. But I create the executor, I do sleep and then I add new Runnables to the queue (in the loop to 100).
Doesn't this loop count as new tasks arrive?
Why doesn't it work and I have to either prestart or explicitely call execute?

推荐答案

当执行任务到达时,将生成工作线程,这些是与底层工作队列交互的线程。如果以非空工作队列开始,则需要预启动工作程序。请参阅OpenJDK 7中的实现

Worker threads are spawned as tasks arrive by execute, and these are the ones that interact with the underlying work queue. You need to prestart the workers if you begin with a non-empty work queue. See the implementation in OpenJDK 7.

我再说一遍,工作人员是与工作队列交互的人员。它们仅在通过执行传递时按需生成。 (或者它上面的层,例如 invokeAll 提交等)如​​果它们没有启动,它将不会你加入队列的工作量是多少,因为没有什么可以检查它没有工人开始

I repeat, the workers are the ones that interact with the work queue. They are only spawned on demand when passed via execute. (or the layers above it, e.g. invokeAll, submit, etc.) If they are not started, it will not matter how much work you add to the queue, since there is nothing checking it as there are no workers started.

ThreadPoolExecutor 在必要之前不会产生工作线程,或者如果你通过方法抢占它们的创建 prestartAllCoreThreads prestartCoreThread 。如果没有工作人员启动,那么你的队列中的任何工作都无法完成。

ThreadPoolExecutor does not spawn worker threads until necessary or if you pre-empt their creation by the methods prestartAllCoreThreads and prestartCoreThread. If there are no workers started, then there is no way any of the work in your queue is going to be done.

添加初始 execute 的工作原理是它强制创建一个唯一核心工作者线程,然后可以开始处理队列中的工作。您也可以调用 prestartCoreThread 并接收类似的行为。如果要启动所有工作者,则必须调用 prestartAllCoreThreads 或通过执行

The reason adding an initial execute works is that it forces the creation of a sole core worker thread, which then can begin processing the work from your queue. You could also call prestartCoreThread and receive similar behavior. If you want to start all the workers, you must call prestartAllCoreThreads or submit that number of tasks via execute.

请参阅下面的执行代码。

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

这篇关于ThreadPoolExecutor和队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆