为什么带副作用的过滤器比基于Spliterator的实现表现更好? [英] Why filter with side effects performs better than a Spliterator based implementation?

查看:121
本文介绍了为什么带副作用的过滤器比基于Spliterator的实现表现更好?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于如何跳过从Files.lines获取的偶数行的问题我遵循接受的答案方法基于 Spliterator< T> 界面实现我自己的 filterEven()方法,例如:

Regarding the question How to skip even lines of a Stream obtained from the Files.lines I followed the accepted answer approach implementing my own filterEven() method based on Spliterator<T> interface, e.g.:

public static <T> Stream<T> filterEven(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

我可以通过以下方式使用:

which I can use in the following way:

Stream<DomainObject> res = Files.lines(src)
filterEven(res)
     .map(line -> toDomainObject(line))

然而,对于使用带有副作用的 filter()的下一个方法测量此方法的性能我发现下一个表现更好:

However measuring the performance of this approach against the next one which uses a filter() with side effects I noticed that the next one performs better:

final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isEvenLine ())
     .map(line -> toDomainObject(line))

我用JMH测试了性能,我没有在基准测试中包含文件负载。我之前将它加载到一个数组中。然后每个基准测试首先从前一个数组创建一个 Stream< String> ,然后过滤偶数行,然后应用 mapToInt()提取 int 字段的值,最后提取 max()操作。这是它的基准之一(你可以检查整个程序 这里,这里有大约186行的数据文件) :

I tested the performance with JMH and I am not including the file load in the benchmark. I previously load it into an array. Then each benchmark starts by creating a Stream<String> from previous array, then filtering even lines, then applying a mapToInt() to extract the value of an int field and finally a max() operation. Here it is one of the benchmarks (you can check the whole Program here and here you have the data file with about 186 lines):

@Benchmark
public int maxTempFilterEven(DataSource src){
    Stream<String> content = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1);                       // Skip line: Not available
    return filterEven(content)              // Filter daily info and skip hourly
            .mapToInt(line -> parseInt(line.substring(14, 16)))
            .max()
            .getAsInt();
}

我无法理解为什么 filter()方法比 filterEven()(~50ops / ms)有更好的性能(~80ops / ms)?

I am not getting why the filter() approach has better performance (~80ops/ms) than the filterEven() (~50ops/ms)?

推荐答案

简介

我想我知道原因但不幸的是我没有想法如何提高基于 Spliterator 的解决方案的性能(至少不重写整个Streams API功能)。

I think I know the reason but unfortunately I have no idea how to improve performance of Spliterator-based solution (at least without rewritting of the whole Streams API feature).

Sidenote 1 :在设计Stream API时,性能不是最重要的设计目标。如果性能至关重要,那么很可能在没有Stream API的情况下重写代码会使代码更快。 (例如,Stream API不可避免地增加了内存分配,从而增加了GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能降级为代价提供更好的更高级API。

Sidenote 1: performance was not the most important design goal when Stream API was designed. If performance is critical, most probably re-writting the code without Stream API will make the code faster. (For example, Stream API unavoidably increases memory allocation and thus GC-pressure). On the other hand in most of the scenarios Stream API provides a nicer higher-level API at a cost of a relatively small performance degradation.

部分 1 简短的理论答案

Stream 旨在实现一种内部迭代作为消费和外部迭代的主要手段(即 Spliterator - 基于)是一种模仿的附加手段。因此,外部迭代涉及一些开销。懒惰增加了外部迭代效率的一些限制,并且需要支持 flatMap 使得在此过程中必须使用某种动态缓冲区。

Stream is designed to implement a kind of internal iteration as the main mean of consuming and external iteration (i.e. Spliterator-based) is an additional mean that is kind of "emulated". Thus external iteration involves some overhead. Laziness adds some limits to the efficiency of external iteration and a need to support flatMap makes it necessary to use some kind of dynamic buffer in this process.

Sidenote 2 在某些情况下, Spliterator 基于迭代的速度可能与内部迭代(在这种情况下是过滤器)。特别是在直接从包含 Stream 的数据创建 Spliterator 的情况下。要查看它,您可以修改测试以将第一个过滤器具体化为 String s数组:

Sidenote 2 In some cases Spliterator-based iteration might be as fast as the internal iteration (i.e. filter in this case). Particularly it is so in the cases when you create a Spliterator directly from that data-containing Stream. To see it, you can modify your tests to materialize your first filter into a Strings array:

String[] filteredData = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1)  
            .toArray(String[]::new);

然后比较 maxTempFilter 的执行情况和 maxTempFilterEven 已修改为接受预过滤的 String [] filteredData 。如果你想知道为什么会这样,你可能应该阅读这个长答案的其余部分或至少第2部分。

and then compare preformance of maxTempFilter and maxTempFilterEven modified to accept that pre-filtered String[] filteredData. If you want to know why this is so, you probably should read the rest of this long answer or at least Part 2.

部分 2 更长的理论答案

Streams被设计为主要通过某些终端操作来消费。虽然支持逐个迭代元素但不是设计为消耗流的主要方式。

Streams were designed to be mainly consumed as a whole by some terminal operation. Iterating elements one by one although supported is not designed as a main way to consume streams.

请注意,使用功能Stream API,例如 map flatMap 过滤器减少 collect 你不能在某个步骤说我有足够的数据,停止迭代源并推动值。您可以丢弃一些传入的数据(如过滤器那样)但不能停止迭代。 ( take skip 转换实际上是使用 Spliterator 实现的;和 anyMatch allMatch noneMatch findFirst findAny 等使用非公共API jusSink.cancellationRequested ,它们也更容易,因为不能有多个终端操作)。如果管道中的所有转换都是同步的,您可以将它们组合成一个聚合函数( Consumer )并在一个简单的循环中调用它(可选地将循环执行分成几个线程) )。这就是我基于状态的过滤器的简化版本所代表的内容(请参阅 向我显示一些代码 部分中的代码)。如果管道中有 flatMap ,它会变得有点复杂但想法仍然相同。

Note that using the "functional" Stream API such as map, flatMap, filter, reduce, and collect you can't say at some step "I have had enough data, stop iterating over the source and pushing values". You can discard some incoming data (as filter does) but can't stop iteration. (take and skip transformations are actually implemented using Spliterator inside; and anyMatch, allMatch, noneMatch, findFirst, findAny, etc. use non-public API j.u.s.Sink.cancellationRequested, also they are easier as there can't be several terminal operations). If all transformations in the pipeline are synchronous, you can combine them into a single aggregated function (Consumer) and call it in a simple loop (optionally splitting the loop execution over several thread). This is what my simplified version of the state based filter represents (see the code in the Show me some code section). It gets a bit more complicated if there is a flatMap in the pipeline but idea is still the same.

Spliterator 基于转换的根本不同,因为它为管道添加了异步的消费者驱动步骤。现在 Spliterator 而不是源 Stream 驱动迭代过程。如果您直接在源 Stream 上请求 Spliterator ,它可能会返回一些只是迭代的实现在其内部数据结构上,这就是为什么实现预过滤数据应该消除性能差异。但是,如果为某些非空管道创建 Spliterator ,除了要求源通过管道逐个推送元素之外,没有其他(简单)选择。某些元素会传递所有过滤器(另请参阅 向我显示一些代码 部分中的第二个示例)。源元素被逐个推送而不是一些批次的事实是基本决定使 Stream 懒惰的结果。需要一个缓冲区而不是一个元素是支持 flatMap 的结果:从源中推送一个元素可以为 Spliterator <生成许多元素/ code>。

Spliterator-based transformation is fundamentally different because it adds an asynchronous consumer-driven step to the pipeline. Now the Spliterator rather than the source Stream drives the iteration process. If you ask for a Spliterator directly on the source Stream, it might be able to return you some implementation that just iterates over its internal data structure and this is why materializing pre-filtered data should remove performance difference. However, if you create a Spliterator for some non-empty pipeline, there is no other (simple) choice other than asking the source to push elements one by one through the pipeline until some element passes all the filters (see also second example in the Show me some code section). The fact that source elements are pushed one by one rather than in some batches is a consequence of the fundamental decision to make Streams lazy. The need for a buffer instead of just one element is the consequence of support for flatMap: pushing one element from the source can produce many elements for Spliterator.

部分 3 显示一些代码

这部分尝试为理论部分中描述的代码(实际代码和模拟代码的链接)提供一些支持。

This part tries to provide some backing with the code (both links to the real code and simulated code) of what was described in the "theoretical" parts.

首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个延迟管道中(请参阅 jusAbstractPipeline 及其子项如 j.u.s.ReferencePipeline 。然后,当应用终端操作时,原始 Stream 中的所有元素都被推送通过管道。

First of all, you should know that current Streams API implementation accumulates non-terminal (intermediate) operations into a single lazy pipeline (see j.u.s.AbstractPipeline and its children such as j.u.s.ReferencePipeline. Then, when the terminal operation is applied, all the elements from the original Stream are "pushed" through the pipeline.

你看到的是两件事的结果:

What you see is the result of two things:



  1. 有的情况下,流管道不同的事实一个基于 Spliterator 的步骤。

  2. 您的 OddLines 是不是管道中的第一步

  1. the fact that streams pipelines are different for cases when you have a Spliterator-based step inside.
  2. the fact that your OddLines is not the first step in the pipeline

带有状态过滤器的代码或多或少类似于以下简单代码:

The code with a stateful filter is more or less similar to the following straightforward code:

static int similarToFilter(String[] data)
{
    final int[] counter = {0};
    final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
    int skip = 1;

    boolean reduceEmpty = true;
    int reduceState = 0;
    for (String outerEl : data)
    {
        if (outerEl.charAt(0) != '#')
        {
            if (skip > 0)
                skip--;
            else
            {
                if (isEvenLine.test(outerEl))
                {
                    int intEl = parseInt(outerEl.substring(14, 16));
                    if (reduceEmpty)
                    {
                        reduceState = intEl;
                        reduceEmpty = false;
                    }
                    else
                    {
                        reduceState = Math.max(reduceState, intEl);
                    }
                }
            }
        }
    }
    return reduceState;
}

请注意,这实际上是一个带有一些计算(过滤/转换)的循环内部。

Note that this is effectively a single loop with some calculations (filtering/transformations) inside.

另一方面,当您将 Spliterator 添加到管道中时,情况会发生显着变化,即使是简化与实际发生的代码相似的代码变得更大,例如:

When you add a Spliterator into the pipeline on the other hand, things change significantly and even with simplifications code that is reasonably similar to what actually happens becomes much larger such as:

interface Sp<T>
{
    public boolean tryAdvance(Consumer<? super T> action);
}

static class ArraySp<T> implements Sp<T>
{
    private final T[] array;
    private int pos;

    public ArraySp(T[] array)
    {
        this.array = array;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (pos < array.length)
        {
            action.accept(array[pos]);
            pos++;
            return true;
        }
        else
        {
            return false;
        }
    }
}

static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
    private final Sp<T> sourceSp;
    private final Predicate<T> filter;

    private final ArrayList<T> buffer = new ArrayList<T>();
    private int pos;


    public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
    {
        this.sourceSp = sourceSp;
        this.filter = filter;
    }

    @Override
    public void accept(T t)
    {
        buffer.add(t);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        while (true)
        {
            if (pos >= buffer.size())
            {
                pos = 0;
                buffer.clear();
                sourceSp.tryAdvance(this);
            }
            // failed to fill buffer
            if (buffer.size() == 0)
                return false;

            T nextElem = buffer.get(pos);
            pos++;
            if (filter.test(nextElem))
            {
                action.accept(nextElem);
                return true;
            }
        }
    }
}

static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
    private Sp<T> sourceSp;

    public OddLineSp(Sp<T> sourceSp)
    {
        this.sourceSp = sourceSp;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (sourceSp == null)
            return false;

        sourceSp.tryAdvance(this);
        if (!sourceSp.tryAdvance(action))
        {
            sourceSp = null;
        }
        return true;

    }

    @Override
    public void accept(T t)
    {

    }
}

static class ReduceIntMax
{
    boolean reduceEmpty = true;
    int reduceState = 0;

    public int getReduceState()
    {
        return reduceState;
    }

    public void accept(int t)
    {
        if (reduceEmpty)
        {
            reduceEmpty = false;
            reduceState = t;
        }
        else
        {
            reduceState = Math.max(reduceState, t);
        }
    }
}


static int similarToSpliterator(String[] data)
{
    ArraySp<String> src = new ArraySp<>(data);

    int[] skip = new int[1];
    skip[0] = 1;
    WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
    {
        if (s.charAt(0) == '#')
            return false;
        if (skip[0] != 0)
        {
            skip[0]--;
            return false;
        }
        return true;
    });
    OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
    final ReduceIntMax reduceIntMax = new ReduceIntMax();
    while (oddLines.tryAdvance(s ->
                               {
                                   int intValue = parseInt(s.substring(14, 16));
                                   reduceIntMax.accept(intValue);
                               })) ; // do nothing in the loop body
    return reduceIntMax.getReduceState();
}

此代码较大,因为逻辑不可能(或至少非常难)表示在循环内没有一些非平凡的有状态回调。这里接口 Sp 混合了 jusStream juSpliterator 接口。

This code is larger because the logic is impossible (or at least very hard) to represent without some non-trivial stateful callbacks inside the loop. Here interface Sp is a mix of j.u.s.Stream and j.u.Spliterator interfaces.


  • Class ArraySp 表示的结果Arrays.stream

Class WrappingSp 类似于 jusStreamSpliterators.WrappingSpliterator 在实际代码中表示任何非空管道的 Spliterator 接口的实现,即 Stream 至少应用了一个中间操作(参见 jusAbstractPipeline.spliterator方法)。在我的代码中,我将它与 StatelessOp 子类合并,并将逻辑负责 filter 方法实现。另外为了简单起见,我使用过滤器实现了 skip

Class WrappingSp is similar to j.u.s.StreamSpliterators.WrappingSpliterator which in the real code represents an implementation of Spliterator interface for any non-empty pipeline i.e. a Stream with at least one intermediate operation applied to it (see j.u.s.AbstractPipeline.spliterator method). In my code I merged it with a StatelessOp subclass and put there logic responsible for filter method implementation. Also for simplcity I implemented skip using filter.

OddLineSp 对应于您的 OddLines 及其产生的

OddLineSp corresponds to your OddLines and its resulting Stream

ReduceIntMax 表示 ReduceOps 终端操作for Math.max for int

ReduceIntMax represents ReduceOps terminal operation for Math.max for int

那么这个例子中重要的是什么?这里重要的是,因为你首先过滤原始流,你的 OddLineSp 是从非空管道创建的,即来自 WrappingSp 。如果你仔细看看 WrappingSp ,你会注意到每次调用 tryAdvance 时,它会委托调用 sourceSp 并将结果累积到缓冲区中。此外,由于管道中没有 flatMap ,因此将逐个复制缓冲区的元素。即每次调用 WrappingSp.tryAdvance 时,它都会调用 ArraySp.tryAdvance ,只返回一个元素(通过回调) ,并将其进一步传递给调用者提供的使用者(除非元素与过滤器不匹配,在这种情况下 ArraySp.tryAdvance 将一次又一次地被调用,但缓冲区一次也不会被多个元素填充。

So what's important in this example? The important thing here is that since you first filter you original stream, your OddLineSp is created from a non-empty pipeline i.e. from a WrappingSp. And if you take a closer look at WrappingSp, you'll notice that every time tryAdvance is called, it delegates the call to the sourceSp and accumulates that result(s) into a buffer. Moreover, since you have no flatMap in the pipeline, elements to the buffer will be copied one by one. I.e. every time WrappingSp.tryAdvance is called, it will call ArraySp.tryAdvance, get back exactly one element (via callback), and pass it further to the consumer provided by the caller (unless the element doesn't match the filter in which case ArraySp.tryAdvance will be called again and again but still the buffer is never filled with more than one element at a time).

Sidenote 3 :如果您想查看真实代码,最有趣的地方是 jusStreamSpliterators。 WrappingSpliterator.tryAdvance ,调用
jusStreamSpliterators。 AbstractWrappingSpliterator.doAdvance 反过来调用 jusStr eamSpliterators。 AbstractWrappingSpliterator.fillBuffer ,它又调用<$ h $ =http初始化的 pusher ://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/stream/StreamSpliterators.java#StreamSpliterators.WrappingSpliterator.initPartialTraversalState%28%29rel = nofollow noreferrer> jusStreamSpliterators。 WrappingSpliterator.initPartialTraversalState

Sidenote 3: If you want to look at the real code, the most intersting places are j.u.s.StreamSpliterators.WrappingSpliterator.tryAdvance which calls j.u.s.StreamSpliterators.AbstractWrappingSpliterator.doAdvance which in turn calls j.u.s.StreamSpliterators.AbstractWrappingSpliterator.fillBuffer which in turn calls pusher that is initialized at j.u.s.StreamSpliterators.WrappingSpliterator.initPartialTraversalState

所以影响性能的主要因素是这个复制到缓冲区。
不幸的是,对于我们这些普通的Java开发人员来说,Stream API的当前实现几乎已经关闭,你不能仅使用继承或组合来修改内部行为的某些方面。
您可以使用一些基于反射的黑客来使复制到缓冲区更有效地适应您的特定情况并获得一些性能(但牺牲 Stream 的懒惰)但你无法完全避免这种复制,因此无论如何 Spliterator 的代码都会变慢。

So the main thing that's hurting performance is this copying into the buffer. Unfortunately for us, usual Java developers, current implementation of the Stream API is pretty much closed and you can't modify only some aspects of the internal behavior using inheritance or composition. You may use some reflection-based hacking to make copying-to-buffer more efficient for your specific case and gain some performance (but sacrifice laziness of the Stream) but you can't avoid this copying altogether and thus Spliterator-based code will be slower anyway.

返回示例来自 Sidenote#2 Spliterator 基于物化过滤的数据工作得更快,因为在 OddLineSp 之前管道中没有 WrappingSp ,因此没有复制到中间缓冲区。

Going back to the example from the Sidenote #2, Spliterator-based test with materialized filteredData works faster because there is no WrappingSp in the pipeline before OddLineSp and thus there will be no copying into an intermediate buffer.

这篇关于为什么带副作用的过滤器比基于Spliterator的实现表现更好?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆