限制和无序流的内部更改 [英] Internal changes for limit and unordered stream

查看:60
本文介绍了限制和无序流的内部更改的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基本上这是在试图回答另一个问题时出现的。假设这段代码:

Basically this came up while trying to answer another question. Suppose this code:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

我理解 IntStream#generate 是一个无序无限流并且要完成它必须有一个短路操作(在这种情况下 limit )。我也理解 Supplier 可以在Stream实现达到该限制之前被调用多次。

I understand the fact that IntStream#generate is an unordered infinite stream and for it to finish there has to be a short-circuiting operation (limit in this case). I also understand that the Supplier is free to be called as many number of times the Stream implementation feels like before it reaches that limit.

在java-8下运行,将打印 count 始终 512 (可能并非总是如此,但是在我的机器上就是这样。)

Running this under java-8, would print count always 512 (may be not always, but it is so on my machine).

在对比运行中,这在java-10下很少超过 5 。所以我的问题是内部发生了什么改变,短路发生得更好(我试图通过拥有源代码并尝试做一些差异来解决这个问题......)

On the contrast running this under java-10 rarely exceeds 5. So my question is what changed internally that the short-circuiting happens so much better (I am trying to answer this on my own by having the sources and trying to do some diffs... )

推荐答案

这种变化发生在Java 9,beta 103和Java 9,beta 120之间( JDK-8154387

The change happened somewhere between Java 9, beta 103 and Java 9, beta 120 (JDK‑8154387).

负责类是 StreamSpliterators.UnorderedSliceSpliterator.OfInt ,resp。它的超类 StreamSpliterators.UnorderedSliceSpliterator

The responsible class is StreamSpliterators.UnorderedSliceSpliterator.OfInt, resp. its super class StreamSpliterators.UnorderedSliceSpliterator.

该类的旧版本看起来像

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

我们可以看到,它尝试缓冲到 CHUNK_SIZE = 1<< 7 每个分裂器中的元素,最终可能以CPU核心数×128个元素结束。

As we can see, it attempts to buffer up to CHUNK_SIZE = 1 << 7 elements in each spliterator, which may end up at "number of CPU cores"×128 elements.

相比之下,新版本看起来很新喜欢

In contrast, the new version looks like

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    protected final int chunkSize;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
            ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
        this.chunkSize = parent.chunkSize;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of chunkSize
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(chunkSize);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

所以现在有一个实例字段 chunkSize 。当有一个定义的限制并且表达式((跳过+限制)/ AbstractTask.LEAF_TARGET)+ 1 计算的值小于 CHUNK_SIZE ,将使用较小的值。因此,当限制较小时, chunkSize 将会小得多。在您的情况下,限制为 5 ,块大小将始终为 1

So now there is an instance field chunkSize. When there is a defined limit and the expression ((skip + limit) / AbstractTask.LEAF_TARGET) + 1 evaluates to a smaller value than CHUNK_SIZE, that smaller value will be used. So when having small limits, the chunkSize will be much smaller. In your case with a limit of 5, the chunk size will always be 1.

这篇关于限制和无序流的内部更改的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆