Reader#lines()由于其spliterator中的不可配置的批量大小策略而严重并行化 [英] Reader#lines() parallelizes badly due to nonconfigurable batch size policy in its spliterator

查看:90
本文介绍了Reader#lines()由于其spliterator中的不可配置的批量大小策略而严重并行化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当流源是 Reader 时,我无法实现流处理的良好并行化。在四核CPU上运行下面的代码我首先观察到3个核心,然后突然下降到两个核心,然后是一个核心。整体CPU利用率徘徊在50%左右。

I cannot achieve good parallelization of stream processing when the stream source is a Reader. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.

请注意示例的以下特征:

Note the following characteristics of the example:


  • 只有6,000行;

  • 每行需要大约20毫秒来处理;

  • 整个过程大约需要一分钟。

这意味着所有压力都在CPU上,I / O很小。该示例是一个用于自动并行化的坐便器。

That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

我的典型输出:

          Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

为了进行比较,如果我使用稍微修改过的变体,我首先将其收集到列表中然后处理:

For comparison, if I use a slightly modified variant where I first collect into a list and then process:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

我得到这个典型的输出:

I get this typical output:

          Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

有什么可以解释这种影响,我该如何解决它以获得充分利用?

What could explain that effect, and how can I work around it to get full utilization?

请注意,我最初在servlet输入流的读者上观察到这一点,因此它不是特定于 FileReader

Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader.

推荐答案

以下是 Spliterators.IteratorSpliterator 源代码中的答案,使用的答案BufferedReader#lines()

Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator, the one used by BufferedReader#lines():

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

另外值得注意的是常数:

Also noteworthy are the constants:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

所以在我的例子中,我使用6,000个元素,因为批量大小步骤我只得到三个批次这恰好解释了我的观察结果,最初使用了三个核心,当较小的批次完成时,使用的是两个核心,然后是一个核心。与此同时,我尝试了一个包含60,000个元素的修改示例,然后我获得了几乎100%的CPU利用率。

So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.

为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为 Spliterator#trySplit 将其分割为指定大小的批次。从我的问题中将它用于用例的最简单方法是这样的:

To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

在较低的级别,下面的类是一个spliterator包装器,它改变了包裹分裂器的 trySplit 行为并保持其他方面不变。

On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit behavior and leaves other aspects unchanged.

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

这篇关于Reader#lines()由于其spliterator中的不可配置的批量大小策略而严重并行化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆