Java:在循环中等待直到ThreadPoolExecutor的任务完成,然后再继续 [英] Java: Wait in a loop until tasks of ThreadPoolExecutor are done before continuing

查看:276
本文介绍了Java:在循环中等待直到ThreadPoolExecutor的任务完成,然后再继续的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力使Dijkstra算法并行化.使每个节点线程查看当前节点的所有边缘.这是与线程并行执行的,但是开销太大.这比顺序版本的算法要花费更长的时间.

I'm working on making the Dijkstra algorithm parallel. Per node threads are made to look at all the edges of the current node. This was made parallel with threads but there is too much overhead. This resulted in a longer time than the sequential version of the algorithm.

添加了ThreadPool来解决此问题,但是我无法等待任务完成才可以继续进行下一个迭代.只有在完成一个节点的所有任务之后,我们才应该继续进行.我们需要所有任务的结果,然后才能按节点搜索下一个最接近的对象.

ThreadPool was added to solve this problem but i'm having trouble with waiting until the tasks are done before I can move on to the next iteration. Only after all tasks for one node is done we should move on. We need the results of all tasks before I can search for the next closest by node.

我尝试做executor.shutdown(),但是有了这个方法,它不会接受新任务.我们如何在循环中等待直到每个任务完成,而不必每次都声明ThreadPoolExecutor.通过使用此线程而不是常规线程,这样做将无法达到减少开销的目的.

I tried doing executor.shutdown() but with this aproach it won't accept new tasks. How can we wait in the loop until every task is finished without having to declare the ThreadPoolExecutor every time. Doing this will defeat the purpose of the less overhead by using this instead of regular threads.

我想到的一件事是添加任务(边)的BlockingQueue.但对于这种解决方案,我也坚持等待任务完成而无需shudown().

One thing I thought about was an BlockingQueue that add the tasks(edges). But also for this solution i'm stuck on waiting for tasks to finish without shudown().

public void apply(int numberOfThreads) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads);

        class DijkstraTask implements Runnable {

            private String name;

            public DijkstraTask(String name) {
                this.name = name;
            }

            public String getName() {
                return name;
            }

            @Override
            public void run() {
                calculateShortestDistances(numberOfThreads);
            }
        }

        // Visit every node, in order of stored distance
        for (int i = 0; i < this.nodes.length; i++) {

            //Add task for each node
            for (int t = 0; t < numberOfThreads; t++) {
                executor.execute(new DijkstraTask("Task " + t));
            }

            //Wait until finished?
            while (executor.getActiveCount() > 0) {
                System.out.println("Active count: " + executor.getActiveCount());
            }

            //Look through the results of the tasks and get the next node that is closest by
            currentNode = getNodeShortestDistanced();

            //Reset the threadCounter for next iteration
            this.setCount(0);
        }
    }

边的数量除以线程数.因此,8个边和2个线程意味着每个线程将并行处理4个边.

The amount of edges is divided by the number of threads. So 8 edges and 2 threads means each thread will deal with 4 edges in parallel.

public void calculateShortestDistances(int numberOfThreads) {

        int threadCounter = this.getCount();
        this.setCount(count + 1);

        // Loop round the edges that are joined to the current node
        currentNodeEdges = this.nodes[currentNode].getEdges();

        int edgesPerThread = currentNodeEdges.size() / numberOfThreads;
        int modulo = currentNodeEdges.size() % numberOfThreads;
        this.nodes[0].setDistanceFromSource(0);
        //Process the edges per thread
        for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter + 1)); joinedEdge++) {

            System.out.println("Start: " + (edgesPerThread * threadCounter) + ". End: " + (edgesPerThread * (threadCounter + 1) + ".JoinedEdge: " + joinedEdge) + ". Total: " + currentNodeEdges.size());
            // Determine the joined edge neighbour of the current node
            int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

            // Only interested in an unvisited neighbour
            if (!this.nodes[neighbourIndex].isVisited()) {
                // Calculate the tentative distance for the neighbour
                int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                // Overwrite if the tentative distance is less than what's currently stored
                if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                    nodes[neighbourIndex].setDistanceFromSource(tentative);
                }
            }
        }

        //if we have a modulo above 0, the last thread will process the remaining edges
        if (modulo > 0 && numberOfThreads == (threadCounter + 1)) {
            for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter) + modulo); joinedEdge++) {
                // Determine the joined edge neighbour of the current node
                int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

                // Only interested in an unvisited neighbour
                if (!this.nodes[neighbourIndex].isVisited()) {
                    // Calculate the tentative distance for the neighbour
                    int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                    // Overwrite if the tentative distance is less than what's currently stored
                    if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                        nodes[neighbourIndex].setDistanceFromSource(tentative);
                    }
                }
            }
        }
        // All neighbours are checked so this node is now visited
        nodes[currentNode].setVisited(true);
    }

感谢您的帮助!

推荐答案

您应该查看CyclicBarrierCountDownLatch.这两种方法都可以防止线程启动,除非其他线程已发出信号表明线程已完成.它们之间的区别在于CyclicBarrier是可重用的,即可以多次使用,而CountDownLatch是一次性的,您不能重置计数.

You should look into CyclicBarrier or a CountDownLatch. Both of these allow you to prevent threads starting unless other threads have signaled that they're done. The difference between them is that CyclicBarrier is reusable, i.e. can be used multiple times, while CountDownLatch is one-shot, you cannot reset the count.

Javadocs中的措辞:

Paraphrasing from the Javadocs:

CountDownLatch 是一种同步辅助工具,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止.

A CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

CyclicBarrier 是一种同步辅助工具,它允许一组线程互相等待,以到达一个公共的障碍点. CyclicBarriers在涉及固定大小的线程方的程序中很有用,该线程方有时必须互相等待.该屏障称为循环屏障,因为它可以在释放等待线程之后重新使用.

A CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

https://docs.oracle.com/zh_CN/java/javase/11/docs/api/java.base/java/util/concurrent/CyclicBarrier.html

https://docs.oracle.com/zh_CN/java/javase/11/docs/api/java.base/java/util/concurrent/CountDownLatch.html

这篇关于Java:在循环中等待直到ThreadPoolExecutor的任务完成,然后再继续的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆