如何使用 invokeAll() 让所有线程池完成他们的任务? [英] How to use invokeAll() to let all thread pool do their task?

查看:90
本文介绍了如何使用 invokeAll() 让所有线程池完成他们的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

现在我希望池在进入 for 循环之前 invokeAll 所有任务但是当我运行这个程序时 for 循环在 invokeAll 之前被执行并抛出这个异常:

Now I want pool to invokeAll all the task before getting to the for loop but when I run this program for loop gets executed before that invokeAll and throws this exception:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run

推荐答案

ExecutorService 的工作方式是,当您调用 invokeAll 时,它会等待所有任务完成:

The way an ExecutorService works is that when you call invokeAll it waits for all tasks to complete:

执行给定的任务,返回持有它们的 Futures 的列表全部完成后的状态和结果.Future.isDone() 对每个都为真返回列表的元素.请注意,已完成的任务可能具有正常终止或抛出异常.结果如果给定的集合被修改,则此方法未定义此操作正在进行中.1(强调)

Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)

这意味着您的任务都已完成,但有些任务可能会引发异常.此异常是 Future 的一部分 - 调用 get 会导致在 ExecutionException 中重新抛出异常.

What that means is that your tasks are all done but some may have thrown an exception. This exception is part of the Future - calling get causes the exception to be rethrown wrapped in an ExecutionException.

来自您的堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

您可以看到确实如此.您的一项任务因 NPE 而失败.ExecutorService 捕获到异常,并通过在调用 Future.get 时抛出 ExecutionException 来告诉您.

You can see that this is indeed the case. One of your tasks has failed with an NPE. The ExecutorService caught the exception and is telling you about it by throwing an ExecutionException when you call Future.get.

现在,如果您想在任务完成时处理,您需要一个 ExecutorCompletionService.这就像一个 BlockingQueue,允许您在任务完成时轮询它们.

Now, if you want to take tasks as they complete you need an ExecutorCompletionService. This acts as a BlockingQueue that will allow you to poll for tasks as and when they finish.

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

在这个例子中,我有一个任务在 ExecutorCompletionService 上调用 take ,它在 Future 可用时获取它们,然后我提交ExecutorCompletionService 的任务.

In this example I have one task that calls take on the ExecutorCompletionService which gets the Futures as they become available and then I submit tasks to the ExecutorCompletionService.

这将允许您在失败后立即获取失败的任务,而不必等待所有任务一起失败或成功.

This will allow you to get the failed task as soon as it fails rather than having to wait for all the tasks to either fail of succeed together.

唯一的复杂之处在于很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的.在本例中,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次.更通用的方法是从 submit 方法中收集 Future ,然后循环它们以查看是否一切都已完成.

The only complication is that it is difficult to tell the polling thread that all the tasks are done as everything is now asynchronous. In this instance I have used the knowledge that 100 tasks will be submitted so that it only need to poll 100 times. A more general way would be to collect the Futures from the submit method as well and then loop over them to see if everything is completed.

这篇关于如何使用 invokeAll() 让所有线程池完成他们的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆