带有ArrayBlockingQueue的ThreadPoolExecutor [英] ThreadPoolExecutor with ArrayBlockingQueue

查看:103
本文介绍了带有ArrayBlockingQueue的ThreadPoolExecutor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开始从Java Doc中阅读有关ThreadPoolExecutor的更多信息,因为我正在其中一个项目中使用它.因此,谁能解释这行的实际含义吗?-我知道每个参数代表什么,但是我想从这里的一些专家那里以更一般/通俗易懂的方式来理解它.

I started reading more about ThreadPoolExecutor from Java Doc as I am using it in one of my project. So Can anyone explain me what does this line means actually?- I know what does each parameter stands for, but I wanted to understand it in more general/lay-man way from some of the experts here.

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

已更新:- 问题陈述是:-

每个线程使用1到1000之间的唯一ID,并且程序必须运行60分钟或更长时间,因此在60分钟内,所有ID都可能已完成,因此我需要再次使用这些ID.这是我使用上面的执行程序编写的下面的程序.

Each thread uses unique ID between 1 and 1000 and program has to run for 60 minutes or more, So in that 60 minutes it is possible that all the ID's will get finished so I need to reuse those ID's again. So this is the below program I wrote by using above executor.

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

我的问题是:-就性能而言,此代码是正确的吗?我还能在这里做些什么来使其更准确?任何帮助将不胜感激.

My Questions is:- This code is right as far as the Performance is considered or not? And what else I can make it here to make it more accurate? Any help will be appreciated.

推荐答案

[首先,我很抱歉,这是对先前答案的答复,但我想格式化].

[First, i apologize, this is a response to a previous answer, but i wanted formatting].

实际上,除非您将项目提交到具有完整队列的ThreadPoolExecutor中,否则您不会阻塞.这样做的原因是ThreadPoolExecutor调用了BlockingQueue.offer(T item)方法,根据定义,该方法是非阻塞方法.它要么添加项目并返回true,要么不添加(已满时)并返回false.然后,ThreadPoolExecutor调用已注册的RejectedExecutionHandler来处理这种情况.

Except in reality, you DON'T block when an item is submitted to a ThreadPoolExecutor with a full queue. The reason for this is that ThreadPoolExecutor calls the BlockingQueue.offer(T item) method which by definition is a non-blocking method. It either adds the item and returns true, or does not add (when full) and returns false. The ThreadPoolExecutor then calls the registered RejectedExecutionHandler to deal with this situation.

来自javadoc:

在将来的某个时间执行给定的任务.任务可以执行 在新线程或现有池线程中.如果任务不能 提交执行,原因之一是该执行者已被执行 关闭或已达到其容量,则任务已处理 通过当前的RejectedExecutionHandler.

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

默认情况下,使用ThreadPoolExecutor.AbortPolicy()从ThreadPoolExecutor的提交"或执行"方法中抛出RejectedExecutionException.

By default, the ThreadPoolExecutor.AbortPolicy() is used which throws a RejectedExecutionException from the "submit" or "execute" method of the ThreadPoolExecutor.

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

但是,您可以使用其他处理程序执行不同的操作,例如忽略错误(DiscardPolicy),或在调用执行"或提交"方法的线程中运行它(CallerRunsPolicy).此示例使队列已满时,无论哪个调用"submit"或"execute"方法的线程都运行请求的任务. (这意味着在任何给定的时间,您可以在池本身的内容之上再运行一件事):

However, you can use other handlers to do something different, such as ignore the error (DiscardPolicy), or run it in the thread which called the "execute" or "submit" method (CallerRunsPolicy). This example lets whichever thread calls the "submit" or "execute" method run the requested task when the queue is full. (this means at any given time, you could 1 additional thing running on top of what's in the pool itself):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

如果要阻塞并等待,可以实现自己的RejectedExecutionHandler,该阻塞将一直阻塞直到队列上有可用插槽为止(这是一个粗略的估计,我尚未编译或运行此消息,但您应该明白这一点) :

If you want to block and wait, you could implement your own RejectedExecutionHandler which would block until there's a slot available on the queue (this is a rough estimate, i have not compiled or run this, but you should get the idea):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}

这篇关于带有ArrayBlockingQueue的ThreadPoolExecutor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆