Reader#lines() 由于其拆分器中的批量大小策略不可配置而严重并行化 [英] Reader#lines() parallelizes badly due to nonconfigurable batch size policy in its spliterator

查看:7
本文介绍了Reader#lines() 由于其拆分器中的批量大小策略不可配置而严重并行化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当流源是 Reader 时,我无法实现流处理的良好并行化.在四核 CPU 上运行下面的代码,我观察到一开始使用了 3 个内核,然后突然下降到只有两个内核,然后是一个内核.整体 CPU 利用率徘徊在 50% 左右.

I cannot achieve good parallelization of stream processing when the stream source is a Reader. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.

请注意示例的以下特征:

Note the following characteristics of the example:

  • 只有 6,000 行;
  • 每行大约需要 20 毫秒来处理;
  • 整个过程大约需要一分钟.

这意味着所有压力都在 CPU 上,I/O 最小.该示例是用于自动并行化的坐鸭.

That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f s
", cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s
", realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

我的典型输出:

          Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

为了比较,如果我使用一个稍微修改的变体,我首先收集到一个列表然后处理:

For comparison, if I use a slightly modified variant where I first collect into a list and then process:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

我得到了这个典型的输出:

I get this typical output:

          Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

什么可以解释这种影响,我该如何解决它以充分利用?

What could explain that effect, and how can I work around it to get full utilization?

请注意,我最初是在 servlet 输入流的读取器上观察到这一点的,因此它并不特定于 FileReader.

Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader.

推荐答案

这是答案,在 Spliterators.IteratorSpliterator 的源代码中详细说明,BufferedReader#lines 使用的代码():

Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator, the one used by BufferedReader#lines():

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

同样值得注意的是常量:

Also noteworthy are the constants:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

因此在我的示例中,我使用 6,000 个元素,我只得到三个批次,因为批次大小步长为 1024.这正好解释了我的观察结果,即最初使用了三个核心,随着较小批次的完成,减少到两个,然后一个.与此同时,我尝试了一个包含 60,000 个元素的修改示例,然后我获得了几乎 100% 的 CPU 利用率.

So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.

为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为一个 Spliterator#trySplit 将它分成指定大小的批次.将它用于我的问题中的用例的最简单方法是这样的:

To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

在较低级别上,下面的类是一个拆分器包装器,它改变了被包装的拆分器的 trySplit 行为并保持其他方面不变.

On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit behavior and leaves other aspects unchanged.

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

这篇关于Reader#lines() 由于其拆分器中的批量大小策略不可配置而严重并行化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆