如何使用invokeAll()让所有线程池完成他们的任务? [英] How to use invokeAll() to let all thread pool do their task?

查看:227
本文介绍了如何使用invokeAll()让所有线程池完成他们的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

现在我想要游泳池 invokeAll 在进入for循环之前的所有任务,但是当我运行此程序for循环时,在 invokeAll 之前执行并抛出此异常:

Now I want pool to invokeAll all the task before getting to the for loop but when I run this program for loop gets executed before that invokeAll and throws this exception:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run


推荐答案

ExecutorService 的工作原理是,当你调用 invokeAll 时,它会等待所有任务完成:

The way an ExecutorService works is that when you call invokeAll it waits for all tasks to complete:


执行给定任务,返回一个持有
状态的Futures列表,并在完成后返回结果。 Future.isDone()对于返回列表的每个
元素都为true。 请注意,已完成的任务可能会正常终止
或抛出异常

这个方法的结果是未定义的,如果给定的集合被修改而
这个操作正在进行中。 1 (强调添加)

Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)

这意味着你的任务都已完成,但有些人可能抛出异常。此异常是 Future 的一部分 - 调用 get 会导致异常重新包装在 ExecutionException 。

What that means is that your tasks are all done but some may have thrown an exception. This exception is part of the Future - calling get causes the exception to be rethrown wrapped in an ExecutionException.

来自你的堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

你可以看到确实如此。您的一项任务因NPE而失败。 ExecutorService 捕获了异常并通过在调用时抛出 ExecutionException 来告诉您Future.get

You can see that this is indeed the case. One of your tasks has failed with an NPE. The ExecutorService caught the exception and is telling you about it by throwing an ExecutionException when you call Future.get.

现在,如果您想完成任务,则需要 ExecutorCompletionService 。这可以作为 BlockingQueue ,它允许您在完成任务时轮询任务。

Now, if you want to take tasks as they complete you need an ExecutorCompletionService. This acts as a BlockingQueue that will allow you to poll for tasks as and when they finish.

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

在这个例子中,我有一个调用的任务 ExecutorCompletionService 上的获取 Future ,因为它们可用,然后我提交任务到 ExecutorCompletionService

In this example I have one task that calls take on the ExecutorCompletionService which gets the Futures as they become available and then I submit tasks to the ExecutorCompletionService.

这将允许您在失败时立即获取失败的任务,而不是必须等待所有任务一起成功失败。

This will allow you to get the failed task as soon as it fails rather than having to wait for all the tasks to either fail of succeed together.

唯一的复杂因素是很难告诉轮询线程所有任务都已完成,因为现在一切都已完成异步。在这个例子中,我使用了将提交100个任务的知识,因此它只需要轮询100次。更通用的方法是从 submit 方法中收集 Future ,然后循环它们以查看如果一切都完成了。

The only complication is that it is difficult to tell the polling thread that all the tasks are done as everything is now asynchronous. In this instance I have used the knowledge that 100 tasks will be submitted so that it only need to poll 100 times. A more general way would be to collect the Futures from the submit method as well and then loop over them to see if everything is completed.

这篇关于如何使用invokeAll()让所有线程池完成他们的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆