如何在Java中并行运行某些内容? [英] How do I run something parallel in Java?

查看:68
本文介绍了如何在Java中并行运行某些内容?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试打印范围内的所有可能组合.例如,如果我的 lowerBound 为3,而我的 max 为5,则需要以下组合:(5,4-5,3-4,3).我已经使用下面的 helper()函数实现了这一点.

I am trying to print all possible combinations within a range. For example if my lowerBound is 3 and my max is 5, I want the following combinations: (5,4 - 5,3 - 4,3). I've implemented this with the helper() function found below.

当然,如果我的max很大,这是很多组合,这将需要很长时间.这就是为什么我试图实现 ForkJoinPool 以便任务并行运行的原因.为此,我创建了一个新的 ForkJoinPool .然后,我遍历r的所有可能值(其中r是组合中的数字数量,在上面的示例中为 r = 3 ).对于r的每个值,我创建一个新的 HelperCalculator ,该扩展了 RecursiveTask< Void> .在这里,我递归地调用 helper()函数.每次我调用此代码时,我都会创建一个新的 HelperCalculator ,并在其上使用 .fork().

Of course if my max is very big this is a lot of combinations and this will take a long time. That's why I'm trying to implement a ForkJoinPool, so that the tasks run parallel. For this I create a new ForkJoinPool. Then I loop over all possible values of r(Where r is the amount of numbers in the combination, in the above example r=3). For every value of r I create a new HelperCalculator, which extends RecursiveTask<Void>. In there I recursively call the helper() function. Every time I call this I create a new HelperCalculator and i use .fork() on that.

问题如下.它不能正确生成所有可能的组合.实际上,它根本不生成任何组合.我试过在 calculator.fork()之后添加 calculator.join(),但这只是无限进行,直到出现 OutOfMemory 错误

The problem is as follows. It is not correctly generating all possible combinations. It actually generates no combinations at all. I've tried adding calculator.join() after calculator.fork(), but that just goes on infinitely till I get an OutOfMemory error.

很明显,我对ForkJoinPool有一些误解,但是尝试了几天后,我再也看不到了.

Obviously there is something I'm misunderstanding about the ForkJoinPool, but I can't see what anymore, after trying for days.

我的主要功能:

            ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
            for (int r = 1; r < 25; r++) {
                int lowerBound = 7;
                int[] data = new int[r];
                int max = 25;
                calculator = new HelperCalculator(data, 0, max, 0, s, n, lowerBound);
                pool.execute(calculator);
                calculator.join();
            }
            pool.shutdown();

HelperCalculator类:

The HelperCalculator class:

    protected Void compute() {
        helper(data, end, start, index, s, lowerBound);
        return null;
    }

    //Generate all possible combinations
    public void helper(int[] data , int end, int start, int index,int s, int lowerBound) {
        //If the array is filled, print it
        if (index == data.length) {
                System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
            if(data[0] >= lowerBound) {
                HelperCalculator calculator = new HelperCalculator(data,end, start-1, index+1, s, n, lowerBound);
                calculator.fork();
                calculators.add(calculator);
                HelperCalculator calculator2 = new HelperCalculator(data, end, start-1, index, s, n, lowerBound);
                calculator2.fork();
                calculators.add(calculator2);
            }
        }

如何使每个 HelperCalculator 并行运行,以便使用ForkJoinPool同时运行23个?还是应该使用其他解决方案?

How do I make every HelperCalculator run parallel, so that there are 23 running at the same time using a ForkJoinPool? Or should I perhaps use a different solution?

我尝试在 calculators 列表上调用 join() isDone(),但是它没有等待正确完成,程序就退出了.

I've tried calling join() and isDone() on the calculators list, but then it doesn't wait for it to finish properly and the program just exits.

因为有人不了解该算法,所以它是:

Because someone doesn't understand the algorithm, here it is:

    public static void main(String[] args) {
            for(int r = 3; r > 0; r--) {
                int[] data = new int[r];
                helper(data, 0, 2, 0);
            }
    }

    public static void helper(int[] data , int end, int start, int index) {
        if (index == data.length) {
            System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
                helper(data, end, start - 1, index + 1);
                helper(data, end, start - 1, index);
            }
        }
    }

此输出为:

[2, 1, 0]
[2, 1]
[2, 0]
[1, 0]
[2]
[1]
[0]

推荐答案

您要分派的某些任务尝试使用相同的数组来评估不同的组合.您可以通过为每个任务创建一个不同的数组或将并行性限制为那些本身已经具有数组的任务(即长度不同的任务)来解决此问题.

Some of the tasks you are forking attempt to use the same array for evaluating different combinations. You can solve the issue by creating a distinct array for each task or by limiting the parallelism to those tasks which already have an array on their own, i.e. those with different length.

但是还有另一种可能性;根本不使用数组.您可以将组合存储到 int 值中,因为每个 int 值都是位的组合.这样不仅节省了很多内存,而且还可以通过递增值轻松地迭代所有可能的组合,因为迭代所有 int 数字还可以迭代所有可能的比特组合¹.我们唯一需要实现的就是通过根据位的位置将其解释为数字,从而为特定的 int 值生成正确的字符串.

But there’s another possibility; don’t use arrays at all. You can store combinations into int values, as each int value is a combination of bits. This does not only save a lot of memory, but you can also easily iterate over all possible combinations by just incrementing the value, as iterating over all int numbers also iterates over all possible bit combinations¹. The only thing we need to implement is generating the right string for a particular int value by interpreting the bits as numbers according to their position.

第一次尝试,我们可以采用简单的方法并使用现有的类:

For a first attempt, we can take the easy way and use already existing classes:

public static void main(String[] args) {
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

该方法使用一个异端,因此对于您的示例,您必须像 combinations(0,3)那样调用它,然后它将打印

The method uses an exclusive end, so for your example, you have to call it like combinations(0, 3) and it will print

[0]
[1]
[0, 1]
[2]
[0, 2]
[1, 2]
[0, 1, 2]
3 ms

当然,时间可能会有所不同

对于上面的 combinations(10,25)示例,它将打印所有组合,然后在我的计算机上显示 3477 ms .这听起来像是一个优化的机会,但是我们应该首先考虑哪些操作会带来哪些成本.

For the combinations(10, 25) example above, it prints all combinations, followed by 3477 ms on my machine. This sounds like an opportunity to optimize, but we should first think about which operations impose which costs.

在这里,对组合的迭代已简化为微不足道的操作.创建字符串要贵一个数量级.但这与包括将数据传输到操作系统的实际打印相比仍然是什么,并且取决于系统,实际的渲染可能会增加我们的时间.由于这样做是在 PrintStream 中保持锁的状态下完成的,因此所有试图同时打印的线程都将被阻塞,从而使其成为不可并行的操作.

Iterating over the combinations has been reduced to a trivial operation here. Creating the string is an order of magnitude more expensive. But this is still nothing compared to the actual printing which includes a data transfer to the operating system and, depending on the system, the actual rendering may add to our time. Since this is done while holding a lock within PrintStream, all threads attempting to print at the same time would be blocked, making it a nonparallelizable operation.

让我们通过创建新的 PrintStream ,禁用换行符自动刷新以及使用能够容纳整个输出的超大缓冲区来确定成本的一部分:

Let’s identify the fraction of the cost, by creating a new PrintStream, disabling the auto-flush on line breaks and using an insanely large buffer, capable of holding the entire output:

public static void main(String[] args) {
    System.setOut(new PrintStream(
        new BufferedOutputStream(new FileOutputStream(FileDescriptor.out),1<<20),false));
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.flush();
    long t2 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.println((t2 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

在我的机器上,它按以下顺序打印一些东西

On my machine, it prints something in the order of

93 ms
3340 ms

表明该代码在不可并行打印上花费了三秒钟以上,而在计算上仅花费了约100毫秒.为了完整起见,以下代码在生成 String 的代码时向下移了一层:

Showing that the code spent more than three seconds on the nonparallelizable printing and only about 100 milliseconds on the calculation. For completeness, the following code goes a level down for the String generation:

static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(bits(i, start));
    }
}
static String bits(int bits, int offset) {
    StringBuilder sb = new StringBuilder().append('[');
    for(;;) {
        int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
        sb.append(num + offset);
        bits -= bit;
        if(bits == 0) break;
        sb.append(", ");
    }
    return sb.append(']').toString();
}

这将我的计算机上的计算时间减半,而对总时间没有明显的影响,这现在就不足为奇了.

which halves the calculation time on my machine, while having no noticable impact on the total time, which shouldn’t come as a surprise now.

但是出于教育目的,忽略潜在的加速不足,让我们讨论如何并行执行此操作.

But for education purposes, ignoring the lack of potential acceleration, let’s discuss how we would parallelize this operation.

顺序代码确实已经使任务变成一种形式,可以归结为从起始值到结束值的迭代.现在,我们将此代码重写为 ForkJoinTask (或适当的子类),该代码代表具有起始值和结束值的迭代.然后,我们增加了通过在中间划分范围来将该操作分为两部分的功能,因此我们得到了在范围的每个一半上迭代的两个任务.可以重复执行此操作,直到我们决定有足够的潜在并行作业并在本地执行当前迭代为止.本地处理后,我们必须等待拆分的所有任务的完成,以确保根任务的完成意味着所有子任务的完成.

The sequential code did already bring the task into a form which boils down to an iteration from a start value to an end value. Now, we rewrite this code to a ForkJoinTask (or suitable subclass) which represents an iteration with a start and end value. Then, we add the ability to split this operation into two, by splitting the range in the middle, so we get two tasks iterating over each half of the range. This can be repeated until we decide to have enough potentially parallel jobs and perform the current iteration locally. After the local processing we have to wait for the completion of any task we split off, to ensure that the completion of the root task implies the completion of all subtasks.

public class Combinations extends RecursiveAction {
    public static void main(String[] args) {
        System.setOut(new PrintStream(new BufferedOutputStream(
            new FileOutputStream(FileDescriptor.out),1<<20),false));
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        long t0 = System.nanoTime();
        Combinations job = Combinations.get(10, 25);
        pool.execute(job);
        job.join();
        long t1 = System.nanoTime();
        System.out.flush();
        long t2 = System.nanoTime();
        System.out.println((t1 - t0)/1_000_000+" ms");
        System.out.println((t2 - t0)/1_000_000+" ms");
        System.out.flush();
    }

    public static Combinations get(int min, int max) {
        return new Combinations(min, 1, (1 << (max - min)) - 1);
    }

    final int offset, from;
    int to;

    private Combinations(int offset, int from, int to) {
        this.offset = offset;
        this.from = from;
        this.to = to;
    }

    @Override
    protected void compute() {
        ArrayDeque<Combinations> spawned = new ArrayDeque<>();
        while(getSurplusQueuedTaskCount() < 2) {
            int middle = (from + to) >>> 1;
            if(middle == from) break;
            Combinations forked = new Combinations(offset, middle, to);
            forked.fork();
            spawned.addLast(forked);
            to = middle - 1;
        }
        performLocal();
        for(;;) {
            Combinations forked = spawned.pollLast();
            if(forked == null) break;
            if(forked.tryUnfork()) forked.performLocal(); else forked.join();
        }
    }

    private void performLocal() {
        for(int i = from, stop = to; i <= stop; i++) {
            System.out.println(bits(i, offset));
        }
    }

    static String bits(int bits, int offset) {
        StringBuilder sb = new StringBuilder().append('[');
        for(;;) {
            int bit=Integer.lowestOneBit(bits), num=Integer.numberOfTrailingZeros(bit);
            sb.append(num + offset);
            bits -= bit;
            if(bits == 0) break;
            sb.append(", ");
        }
        return sb.append(']').toString();
    }
}

<代码> getSurplusQueuedTaskCount() 为我们提供了有关工作线程饱和的提示,换句话说,分叉更多的工作是否有益.将返回的数字与通常较小的阈值进行比较,作业越异构,因此预期的工作量就应该越高,这是阈值,以便在作业比其他作业更早完成时可以进行更多的窃取工作.在我们的情况下,预计工作量将非常均衡.

The getSurplusQueuedTaskCount() provides us with a hint about the saturation of the worker threads, in other words, whether forking more jobs might be beneficial. The returned number is compared with a threshold that is typically a small number, the more heterogeneous the jobs and hence, the expected workload, the higher should be the threshold to allow more work-stealing when jobs complete earlier than others. In our case, the workload is expected to be very well balanced.

有两种分割方法.示例通常创建两个或多个分叉的子任务,然后将它们联接.这可能导致大量任务仅在等待其他任务.另一种方法是派生一个子任务并更改当前任务,以代表另一个任务.在这里,分叉的任务表示 [中间,到] 的范围,而当前任务被修改为表示 [from,middle] 的范围.

There are two ways of splitting. Examples often create two or more forked subtasks, followed by joining them. This may lead to a large number of tasks just waiting for others. The alternative is to fork a subtask and alter the current task, to represent the other. Here, the forked task represents the [middle, to] range whereas the current task is modified to represent the [from, middle] range.

分叉足够的任务后,剩余范围将在当前线程中本地处理.然后,该任务将进行一次优化以等待所有分支的子任务:它将

After forking enough tasks, the remaining range is processed locally in the current thread. Then, the task will wait for all forked subtasks, with one optimization: it will try to unfork the subtasks, to process them locally if no other worker thread has stolen them yet.

这可以顺利进行,但不幸的是,正如预期的那样,它不会加速操作,因为最昂贵的部分是打印.

This works smoothly, but unfortunately, as expected, it does not accelerate the operation, as the most expensive part is the printing.

¹使用 int 表示所有组合会将支持的范围长度减少到31,但是请记住,这样的范围长度意味着2³¹-1 组合,即有很多需要迭代的地方.如果仍然觉得有限制,可以将代码更改为使用 long .当时支持的范围长度为63,即2⁶³-1 组合,足以使计算机忙于宇宙的终结.

¹ Using an int to represent all combinations reduces the supported range length to 31, but keep in mind that such a range length implies 2³¹ - 1 combinations, which is quite a lot to iterate over. If that still feels like a limitation, you may change the code to use long instead. The then-supported range length of 63, in other words 2⁶³ - 1 combinations, is enough to keep to computer busy until the end of the universe.

这篇关于如何在Java中并行运行某些内容?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆