什么决定了 Java ForkJoinPool 创建的线程数? [英] What determines the number of threads a Java ForkJoinPool creates?

查看:49
本文介绍了什么决定了 Java ForkJoinPool 创建的线程数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

据我所知 ForkJoinPool,该池创建固定数量的线程(默认:核心数)并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程)managedBlock).

As far as I had understood ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and will never create more threads (unless the application indicates a need for those by using managedBlock).

然而,使用 ForkJoinPool.getPoolSize() 我发现在创建 30,000 个任务 (RecursiveAction) 的程序中,ForkJoinPool 执行这些任务任务平均使用 700 个线程(每次创建任务时计算线程数).这些任务不做 I/O,而是纯计算;唯一的任务间同步是调用 ForkJoinTask.join() 并访问 AtomicBooleans,即没有线程阻塞操作.

However, using ForkJoinPool.getPoolSize() I discovered that in a program that creates 30,000 tasks (RecursiveAction), the ForkJoinPool executing those tasks uses 700 threads on average (threads counted each time a task is created). The tasks don't do I/O, but pure computation; the only inter-task synchronization is calling ForkJoinTask.join() and accessing AtomicBooleans, i.e. there are no thread-blocking operations.

由于 join() 没有按照我的理解阻塞调用线程,所以池中的任何线程都没有理由阻塞,因此(我假设)应该没有有理由创建任何其他线程(尽管这显然正在发生).

Since join() does not block the calling thread as I understand it, there is no reason why any thread in the pool should ever block, and so (I had assumed) there should be no reason to create any further threads (which is obviously happening nevertheless).

那么,为什么 ForkJoinPool 会创建这么多线程?哪些因素决定了创建的线程数量?

So, why does ForkJoinPool create so many threads? What factors determine the number of threads created?

我曾希望无需发布代码即可回答此问题,但这里应要求提供.这段代码是从一个四倍大小的程序中摘录的,缩减为基本部分;它不会按原样编译.如果需要,我当然也可以发布完整的程序.

I had hoped that this question could be answered without posting code, but here it comes upon request. This code is an excerpt from a program of four times the size, reduced to the essential parts; it does not compile as it is. If desired, I can of course post the full program, too.

该程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在 SolverTaskcompute() 方法中:一个 RecursiveAction 从某个给定点开始,并继续所有可到达的相邻点当前点.它不是在每个分支点创建一个新的 SolverTask(这会创建太多任务),而是将除一个之外的所有邻居推入回溯堆栈以供稍后处理,并且仅继续一个邻居未被推入到堆栈.一旦它以这种方式到达死胡同,最近推入回溯堆栈的点就会弹出,并从那里继续搜索(相应地削减从 taks 的起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务;从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何进一步的点推入其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.

The program searches a maze for a path from a given start point to a given end point using depth-first search. A solution is guaranteed to exist. The main logic is in the compute() method of SolverTask: A RecursiveAction that starts at some given point and continues with all neighbor points reachable from the current point. Rather than creating a new SolverTask at each branching point (which would create far too many tasks), it pushes all neighbors except one onto a backtracking stack to be processed later and continues with only the one neighbor not pushed to the stack. Once it reaches a dead end that way, the point most recently pushed to the backtracking stack is popped, and the search continues from there (cutting back the path built from the taks's starting point accordingly). A new task is created once a task finds its backtracking stack larger than a certain threshold; from that time, the task, while continuing to pop from its backtracking stack until that is exhausted, will not push any further points to its stack when reaching a branching point, but create a new task for each such point. Thus, the size of the tasks can be adjusted using the stack limit threshold.

我上面引用的数字(平均 30,000 个任务,700 个线程")来自搜索 5000x5000 单元的迷宫.所以,这里是基本代码:

The numbers I quoted above ("30,000 tasks, 700 threads on average") are from searching a maze of 5000x5000 cells. So, here is the essential code:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

推荐答案

stackoverflow上有相关问题:

There're related questions on stackoverflow:

在 invokeAll/join 期间 ForkJoinPool 停顿

ForkJoinPool 似乎浪费了一个线程

我制作了一个可运行的精简版本(我使用的 jvm 参数:-Xms256m -Xmx1024m -Xss8m):

I made a runnable stripped down version of what is happening (jvm arguments i used: -Xms256m -Xmx1024m -Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

显然,当您执行连接时,当前线程看到所需的任务尚未完成,并需要自己完成另一个任务.

Apparently when you perform a join, current thread sees that required task is not yet completed and takes another task for himself to do.

它发生在 java.util.concurrent.ForkJoinWorkerThread#joinTask 中.

然而,这个新任务产生了更多相同的任务,但它们在池中找不到线程,因为线程被锁定在连接中.并且由于它无法知道释放它们需要多长时间(线程可能处于无限循环或永远死锁),因此产生了新线程(补偿加入的线程为 Louis Wasserman 提到):java.util.concurrent.ForkJoinPool#signalWork

However this new task spawns more of the same tasks, but they can not find threads in the pool, because threads are locked in join. And since it has no way to know how much time it will require for them to be released (thread could be in infinite loop or deadlocked forever), new thread(s) is(are) spawned (Compensating for joined threads as Louis Wasserman mentioned): java.util.concurrent.ForkJoinPool#signalWork

所以为了防止这种情况,你需要避免任务的递归产生.

So to prevent such scenario you need to avoid recursive spawning of tasks.

例如,如果在上面的代码中您将初始参数设置为 1,则活动线程数量将为 2,即使您将 childrenCount 增加十倍.

For example if in above code you set initial parameter to 1, active thread amount will be 2, even if you increase childrenCount tenfold.

另请注意,当活动线程数量增加时,运行线程数量小于或等于并行度.

Also note that, while amount of active threads increases, amount of running threads is less or equal to parallelism.

这篇关于什么决定了 Java ForkJoinPool 创建的线程数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆