为什么带副作用的过滤器比基于Spliterator的实现表现更好? [英] Why filter with side effects performs better than a Spliterator based implementation?
问题描述
关于如何跳过从Files.lines获取的偶数行的问题我遵循接受的答案方法基于 Spliterator< T>
界面实现我自己的 filterEven()
方法,例如:
Regarding the question How to skip even lines of a Stream obtained from the Files.lines I followed the accepted answer approach implementing my own filterEven()
method based on Spliterator<T>
interface, e.g.:
public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}
我可以通过以下方式使用:
which I can use in the following way:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))
然而,对于使用带有副作用的 filter()
的下一个方法测量此方法的性能我发现下一个表现更好:
However measuring the performance of this approach against the next one which uses a filter()
with side effects I noticed that the next one performs better:
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))
我用JMH测试了性能,我没有在基准测试中包含文件负载。我之前将它加载到一个数组中。然后每个基准测试首先从前一个数组创建一个 Stream< String>
,然后过滤偶数行,然后应用 mapToInt()
提取 int
字段的值,最后提取 max()
操作。这是它的基准之一(你可以检查整个程序
这里,这里有大约186行的数据文件) :
I tested the performance with JMH and I am not including the file load in the benchmark. I previously load it into an array. Then each benchmark starts by creating a Stream<String>
from previous array, then filtering even lines, then applying a mapToInt()
to extract the value of an int
field and finally a max()
operation. Here it is one of the benchmarks (you can check the whole Program
here and here you have the data file with about 186 lines):
@Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14, 16)))
.max()
.getAsInt();
}
我无法理解为什么 filter()
方法比 filterEven()
(~50ops / ms)有更好的性能(~80ops / ms)?
I am not getting why the filter()
approach has better performance (~80ops/ms) than the filterEven()
(~50ops/ms)?
推荐答案
简介
我想我知道原因但不幸的是我没有想法如何提高基于 Spliterator
的解决方案的性能(至少不重写整个Streams API功能)。
I think I know the reason but unfortunately I have no idea how to improve performance of Spliterator
-based solution (at least without rewritting of the whole Streams API feature).
Sidenote 1 :在设计Stream API时,性能不是最重要的设计目标。如果性能至关重要,那么很可能在没有Stream API的情况下重写代码会使代码更快。 (例如,Stream API不可避免地增加了内存分配,从而增加了GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能降级为代价提供更好的更高级API。
Sidenote 1: performance was not the most important design goal when Stream API was designed. If performance is critical, most probably re-writting the code without Stream API will make the code faster. (For example, Stream API unavoidably increases memory allocation and thus GC-pressure). On the other hand in most of the scenarios Stream API provides a nicer higher-level API at a cost of a relatively small performance degradation.
部分 1 或简短的理论答案
Stream
旨在实现一种内部迭代作为消费和外部迭代的主要手段(即 Spliterator
- 基于)是一种模仿的附加手段。因此,外部迭代涉及一些开销。懒惰增加了外部迭代效率的一些限制,并且需要支持 flatMap
使得在此过程中必须使用某种动态缓冲区。
Stream
is designed to implement a kind of internal iteration as the main mean of consuming and external iteration (i.e. Spliterator
-based) is an additional mean that is kind of "emulated". Thus external iteration involves some overhead. Laziness adds some limits to the efficiency of external iteration and a need to support flatMap
makes it necessary to use some kind of dynamic buffer in this process.
Sidenote 2 在某些情况下, Spliterator
基于迭代的速度可能与内部迭代(在这种情况下是过滤器
)。特别是在直接从包含 Stream
的数据创建 Spliterator
的情况下。要查看它,您可以修改测试以将第一个过滤器具体化为 String
s数组:
Sidenote 2 In some cases Spliterator
-based iteration might be as fast as the internal iteration (i.e. filter
in this case). Particularly it is so in the cases when you create a Spliterator
directly from that data-containing Stream
. To see it, you can modify your tests to materialize your first filter into a String
s array:
String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);
然后比较 maxTempFilter
的执行情况和 maxTempFilterEven
已修改为接受预过滤的 String [] filteredData
。如果你想知道为什么会这样,你可能应该阅读这个长答案的其余部分或至少第2部分。
and then compare preformance of maxTempFilter
and maxTempFilterEven
modified to accept that pre-filtered String[] filteredData
. If you want to know why this is so, you probably should read the rest of this long answer or at least Part 2.
部分 2 或更长的理论答案:
Streams被设计为主要通过某些终端操作来消费。虽然支持逐个迭代元素但不是设计为消耗流的主要方式。
Streams were designed to be mainly consumed as a whole by some terminal operation. Iterating elements one by one although supported is not designed as a main way to consume streams.
请注意,使用功能Stream API,例如 map
, flatMap
,过滤器
,减少
, collect
你不能在某个步骤说我有足够的数据,停止迭代源并推动值。您可以丢弃一些传入的数据(如过滤器
那样)但不能停止迭代。 ( take
和 skip
转换实际上是使用 Spliterator
实现的;和 anyMatch
, allMatch
, noneMatch
, findFirst
, findAny
等使用非公共API jusSink.cancellationRequested
,它们也更容易,因为不能有多个终端操作)。如果管道中的所有转换都是同步的,您可以将它们组合成一个聚合函数( Consumer
)并在一个简单的循环中调用它(可选地将循环执行分成几个线程) )。这就是我基于状态的过滤器的简化版本所代表的内容(请参阅 向我显示一些代码 部分中的代码)。如果管道中有 flatMap
,它会变得有点复杂但想法仍然相同。
Note that using the "functional" Stream API such as map
, flatMap
, filter
, reduce
, and collect
you can't say at some step "I have had enough data, stop iterating over the source and pushing values". You can discard some incoming data (as filter
does) but can't stop iteration. (take
and skip
transformations are actually implemented using Spliterator
inside; and anyMatch
, allMatch
, noneMatch
, findFirst
, findAny
, etc. use non-public API j.u.s.Sink.cancellationRequested
, also they are easier as there can't be several terminal operations). If all transformations in the pipeline are synchronous, you can combine them into a single aggregated function (Consumer
) and call it in a simple loop (optionally splitting the loop execution over several thread). This is what my simplified version of the state based filter represents (see the code in the Show me some code section). It gets a bit more complicated if there is a flatMap
in the pipeline but idea is still the same.
Spliterator
基于转换的根本不同,因为它为管道添加了异步的消费者驱动步骤。现在 Spliterator
而不是源 Stream
驱动迭代过程。如果您直接在源 Stream
上请求 Spliterator
,它可能会返回一些只是迭代的实现在其内部数据结构上,这就是为什么实现预过滤数据应该消除性能差异。但是,如果为某些非空管道创建 Spliterator
,除了要求源通过管道逐个推送元素之外,没有其他(简单)选择。某些元素会传递所有过滤器(另请参阅 向我显示一些代码 部分中的第二个示例)。源元素被逐个推送而不是一些批次的事实是基本决定使 Stream
懒惰的结果。需要一个缓冲区而不是一个元素是支持 flatMap
的结果:从源中推送一个元素可以为 Spliterator <生成许多元素/ code>。
Spliterator
-based transformation is fundamentally different because it adds an asynchronous consumer-driven step to the pipeline. Now the Spliterator
rather than the source Stream
drives the iteration process. If you ask for a Spliterator
directly on the source Stream
, it might be able to return you some implementation that just iterates over its internal data structure and this is why materializing pre-filtered data should remove performance difference. However, if you create a Spliterator
for some non-empty pipeline, there is no other (simple) choice other than asking the source to push elements one by one through the pipeline until some element passes all the filters (see also second example in the Show me some code section). The fact that source elements are pushed one by one rather than in some batches is a consequence of the fundamental decision to make Stream
s lazy. The need for a buffer instead of just one element is the consequence of support for flatMap
: pushing one element from the source can produce many elements for Spliterator
.
部分 3 或显示一些代码
这部分尝试为理论部分中描述的代码(实际代码和模拟代码的链接)提供一些支持。
This part tries to provide some backing with the code (both links to the real code and simulated code) of what was described in the "theoretical" parts.
首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个延迟管道中(请参阅 jusAbstractPipeline 及其子项如 j.u.s.ReferencePipeline 。然后,当应用终端操作时,原始 Stream
中的所有元素都被推送通过管道。
First of all, you should know that current Streams API implementation accumulates non-terminal (intermediate) operations into a single lazy pipeline (see j.u.s.AbstractPipeline and its children such as j.u.s.ReferencePipeline. Then, when the terminal operation is applied, all the elements from the original Stream
are "pushed" through the pipeline.
你看到的是两件事的结果:
What you see is the result of two things:
- 当
有的情况下,流管道不同的事实一个基于Spliterator
的步骤。 - 您的
OddLines
是不是管道中的第一步
- the fact that streams pipelines are different for cases when you
have a
Spliterator
-based step inside. - the fact that your
OddLines
is not the first step in the pipeline
带有状态过滤器的代码或多或少类似于以下简单代码:
The code with a stateful filter is more or less similar to the following straightforward code:
static int similarToFilter(String[] data)
{
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
int skip = 1;
boolean reduceEmpty = true;
int reduceState = 0;
for (String outerEl : data)
{
if (outerEl.charAt(0) != '#')
{
if (skip > 0)
skip--;
else
{
if (isEvenLine.test(outerEl))
{
int intEl = parseInt(outerEl.substring(14, 16));
if (reduceEmpty)
{
reduceState = intEl;
reduceEmpty = false;
}
else
{
reduceState = Math.max(reduceState, intEl);
}
}
}
}
}
return reduceState;
}
请注意,这实际上是一个带有一些计算(过滤/转换)的循环内部。
Note that this is effectively a single loop with some calculations (filtering/transformations) inside.
另一方面,当您将 Spliterator
添加到管道中时,情况会发生显着变化,即使是简化与实际发生的代码相似的代码变得更大,例如:
When you add a Spliterator
into the pipeline on the other hand, things change significantly and even with simplifications code that is reasonably similar to what actually happens becomes much larger such as:
interface Sp<T>
{
public boolean tryAdvance(Consumer<? super T> action);
}
static class ArraySp<T> implements Sp<T>
{
private final T[] array;
private int pos;
public ArraySp(T[] array)
{
this.array = array;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (pos < array.length)
{
action.accept(array[pos]);
pos++;
return true;
}
else
{
return false;
}
}
}
static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
private final Sp<T> sourceSp;
private final Predicate<T> filter;
private final ArrayList<T> buffer = new ArrayList<T>();
private int pos;
public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
{
this.sourceSp = sourceSp;
this.filter = filter;
}
@Override
public void accept(T t)
{
buffer.add(t);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (true)
{
if (pos >= buffer.size())
{
pos = 0;
buffer.clear();
sourceSp.tryAdvance(this);
}
// failed to fill buffer
if (buffer.size() == 0)
return false;
T nextElem = buffer.get(pos);
pos++;
if (filter.test(nextElem))
{
action.accept(nextElem);
return true;
}
}
}
}
static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
private Sp<T> sourceSp;
public OddLineSp(Sp<T> sourceSp)
{
this.sourceSp = sourceSp;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (sourceSp == null)
return false;
sourceSp.tryAdvance(this);
if (!sourceSp.tryAdvance(action))
{
sourceSp = null;
}
return true;
}
@Override
public void accept(T t)
{
}
}
static class ReduceIntMax
{
boolean reduceEmpty = true;
int reduceState = 0;
public int getReduceState()
{
return reduceState;
}
public void accept(int t)
{
if (reduceEmpty)
{
reduceEmpty = false;
reduceState = t;
}
else
{
reduceState = Math.max(reduceState, t);
}
}
}
static int similarToSpliterator(String[] data)
{
ArraySp<String> src = new ArraySp<>(data);
int[] skip = new int[1];
skip[0] = 1;
WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
{
if (s.charAt(0) == '#')
return false;
if (skip[0] != 0)
{
skip[0]--;
return false;
}
return true;
});
OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
final ReduceIntMax reduceIntMax = new ReduceIntMax();
while (oddLines.tryAdvance(s ->
{
int intValue = parseInt(s.substring(14, 16));
reduceIntMax.accept(intValue);
})) ; // do nothing in the loop body
return reduceIntMax.getReduceState();
}
此代码较大,因为逻辑不可能(或至少非常难)表示在循环内没有一些非平凡的有状态回调。这里接口 Sp
混合了 jusStream
和 juSpliterator
接口。
This code is larger because the logic is impossible (or at least very hard) to represent without some non-trivial stateful callbacks inside the loop. Here interface Sp
is a mix of j.u.s.Stream
and j.u.Spliterator
interfaces.
-
Class
ArraySp
表示的结果Arrays.stream
。
Class WrappingSp
类似于 jusStreamSpliterators.WrappingSpliterator 在实际代码中表示任何非空管道的 Spliterator
接口的实现,即 Stream
至少应用了一个中间操作(参见 jusAbstractPipeline.spliterator方法)。在我的代码中,我将它与 StatelessOp
子类合并,并将逻辑负责 filter
方法实现。另外为了简单起见,我使用过滤器
实现了 skip
。
Class WrappingSp
is similar to j.u.s.StreamSpliterators.WrappingSpliterator which in the real code represents an implementation of Spliterator
interface for any non-empty pipeline i.e. a Stream
with at least one intermediate operation applied to it (see j.u.s.AbstractPipeline.spliterator method). In my code I merged it with a StatelessOp
subclass and put there logic responsible for filter
method implementation. Also for simplcity I implemented skip
using filter
.
OddLineSp
对应于您的 OddLines
及其产生的流
OddLineSp
corresponds to your OddLines
and its resulting Stream
ReduceIntMax
表示 ReduceOps
终端操作for Math.max
for int
ReduceIntMax
represents ReduceOps
terminal operation for Math.max
for int
那么这个例子中重要的是什么?这里重要的是,因为你首先过滤原始流,你的 OddLineSp
是从非空管道创建的,即来自 WrappingSp
。如果你仔细看看 WrappingSp
,你会注意到每次调用 tryAdvance
时,它会委托调用 sourceSp
并将结果累积到缓冲区
中。此外,由于管道中没有 flatMap
,因此将逐个复制缓冲区
的元素。即每次调用 WrappingSp.tryAdvance
时,它都会调用 ArraySp.tryAdvance
,只返回一个元素(通过回调) ,并将其进一步传递给调用者提供的使用者
(除非元素与过滤器不匹配,在这种情况下 ArraySp.tryAdvance
将一次又一次地被调用,但缓冲区
一次也不会被多个元素填充。
So what's important in this example? The important thing here is that since you first filter you original stream, your OddLineSp
is created from a non-empty pipeline i.e. from a WrappingSp
. And if you take a closer look at WrappingSp
, you'll notice that every time tryAdvance
is called, it delegates the call to the sourceSp
and accumulates that result(s) into a buffer
. Moreover, since you have no flatMap
in the pipeline, elements to the buffer
will be copied one by one. I.e. every time WrappingSp.tryAdvance
is called, it will call ArraySp.tryAdvance
, get back exactly one element (via callback), and pass it further to the consumer
provided by the caller (unless the element doesn't match the filter in which case ArraySp.tryAdvance
will be called again and again but still the buffer
is never filled with more than one element at a time).
Sidenote 3 :如果您想查看真实代码,最有趣的地方是 jusStreamSpliterators。 WrappingSpliterator.tryAdvance
,调用
jusStreamSpliterators。 AbstractWrappingSpliterator.doAdvance
反过来调用 jusStr eamSpliterators。 AbstractWrappingSpliterator.fillBuffer
,它又调用<$ h $ =http初始化的 pusher
://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/stream/StreamSpliterators.java#StreamSpliterators.WrappingSpliterator.initPartialTraversalState%28%29rel = nofollow noreferrer> jusStreamSpliterators。 WrappingSpliterator.initPartialTraversalState
Sidenote 3: If you want to look at the real code, the most intersting places are j.u.s.StreamSpliterators.WrappingSpliterator.tryAdvance
which calls
j.u.s.StreamSpliterators.AbstractWrappingSpliterator.doAdvance
which in turn calls j.u.s.StreamSpliterators.AbstractWrappingSpliterator.fillBuffer
which in turn calls pusher
that is initialized at j.u.s.StreamSpliterators.WrappingSpliterator.initPartialTraversalState
所以影响性能的主要因素是这个复制到缓冲区。
不幸的是,对于我们这些普通的Java开发人员来说,Stream API的当前实现几乎已经关闭,你不能仅使用继承或组合来修改内部行为的某些方面。
您可以使用一些基于反射的黑客来使复制到缓冲区更有效地适应您的特定情况并获得一些性能(但牺牲 Stream
的懒惰)但你无法完全避免这种复制,因此无论如何 Spliterator
的代码都会变慢。
So the main thing that's hurting performance is this copying into the buffer.
Unfortunately for us, usual Java developers, current implementation of the Stream API is pretty much closed and you can't modify only some aspects of the internal behavior using inheritance or composition.
You may use some reflection-based hacking to make copying-to-buffer more efficient for your specific case and gain some performance (but sacrifice laziness of the Stream
) but you can't avoid this copying altogether and thus Spliterator
-based code will be slower anyway.
返回示例来自 Sidenote#2 , Spliterator
基于物化过滤的数据
工作得更快,因为在 OddLineSp
之前管道中没有 WrappingSp
,因此没有复制到中间缓冲区。
Going back to the example from the Sidenote #2, Spliterator
-based test with materialized filteredData
works faster because there is no WrappingSp
in the pipeline before OddLineSp
and thus there will be no copying into an intermediate buffer.
这篇关于为什么带副作用的过滤器比基于Spliterator的实现表现更好?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!