在ThreadPoolExecutor中测试PriorityBlockingQueue [英] Testing PriorityBlockingQueue in ThreadPoolExecutor
问题描述
我在这个例子中实现了我的ThreadPoolExecutor和PriorityBlockingQueue:
https://stackoverflow.com/a/12722648/ 2206775
I realized my ThreadPoolExecutor with PriorityBlockingQueue like in this example: https://stackoverflow.com/a/12722648/2206775
并写了一个测试:
PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(16);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 2);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("5");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 5);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("4");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 4);
executorService.shutdown();
try {
executorService.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
但最后,我得不到1 2 3 4 5,我得到这些数字的随机顺序。测试有问题,还是其他什么问题?如果首先,如何才能正确测试?
But in the end, I don't get 1 2 3 4 5, I get a random order of those numbers. Is there a problem with the test, or something else? And if first, how can it be tested correctly?
推荐答案
只有在游泳池完全忙碌时才考虑优先级你提交了几个新任务。如果仅使用一个线程定义池,则应获得预期的输出。在你的例子中,所有任务同时执行,哪一个首先完成有点随机。
The priority is only taken into account if the pool is fully busy and you submit several new tasks. If you define your pool with only one thread, you should get the expected output. In your example, all tasks get executed concurrently and which one finishes first is somewhat random.
顺便说一下,链接的实现有问题,如果你的队列是,则抛出异常已完成并且您提交了新任务。
By the way the linked implementation has a problem and throws an exception if your queue is full and you submit new tasks.
请参阅下面的示例,了解您要实现的目标(我已覆盖 newTaskFor
以简单的方式,只是为了使它工作 - 你可能想要改进那部分。)
See below a working example of what you are trying to achieve (I have overriden newTaskFor
in a simplistic way, just to make it work - you might want to improve that part).
打印: 1 2 3 4 5
。
public class Test {
public static void main(String[] args) {
PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
executorService.submit(getRunnable("1"), 1);
executorService.submit(getRunnable("3"), 3);
executorService.submit(getRunnable("2"), 2);
executorService.submit(getRunnable("5"), 5);
executorService.submit(getRunnable("4"), 4);
executorService.shutdown();
try {
executorService.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Runnable getRunnable(final String id) {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return (RunnableFuture<T>) callable;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return (RunnableFuture<T>) runnable;
}
}
static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {
volatile int priority = 0;
public ComparableFutureTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask<T> o) {
return Integer.valueOf(priority).compareTo(o.priority);
}
}
}
这篇关于在ThreadPoolExecutor中测试PriorityBlockingQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!