收集器的组合器功能是否可以用于顺序流? [英] Can a Collector's combiner function ever be used on sequential streams?

查看:78
本文介绍了收集器的组合器功能是否可以用于顺序流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

示例程序:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

所以,为了简化这里的事情,没有最终的转换,所以生成的代码非常简单。

So, to simplify matters here, there is no final transformation, so the resulting code is quite simple.

现在, IntStream.range()生成一个顺序流。我只是将结果打包成 Integer s然后我设计的收集器将它们收集到列表< ;整数> 。非常简单。

Now, IntStream.range() produces a sequential stream. I simply box the results into Integers and then my contrived Collector collects them into a List<Integer>. Pretty simple.

无论我运行这个示例程序多少次, UnsupportedOperationException 都不会命中,这意味着我的假组合器从未被调用。

And no matter how many times I run this sample program, the UnsupportedOperationException never hits, which means my dummy combiner is never called.

我有点期待这个,但后来我已经误解了流,我不得不问这个问题......

I kind of expected this, but then I have already misunderstood streams enough that I have to ask the question...

当流保证顺序时,是否可以调用收集器的组合器?

Can a Collector's combiner ever be called when the stream is guaranteed to be sequential?

推荐答案

仔细阅读 ReduceOps.java 显示只有在调用combine函数时 ReduceTask 完成, ReduceTask 实例仅在并行评估管道时使用。因此,当前实现中的 在评估顺序管道时从不调用组合器。

A careful reading of the streams implementation code in ReduceOps.java reveals that the combine function is called only when a ReduceTask completes, and ReduceTask instances are used only when evaluating a pipeline in parallel. Thus, in the current implementation, the combiner is never called when evaluating a sequential pipeline.

没有什么但是,在保证这一点的规范中。 收集器是一个对其实现提出要求的接口,并且没有为顺序流授予豁免。就个人而言,我发现很难想象为什么顺序管道评估可能需要调用组合器,但是比我更有想象力的人可能会发现它的巧妙用途,并实现它。规范允许它,即使今天的实现没有这样做,你仍然需要考虑它。

There is nothing in the specification that guarantees this, however. A Collector is an interface that makes requirements on its implementations, and there are no exemptions granted for sequential streams. Personally, I find it difficult to imagine why sequential pipeline evaluation might need to call the combiner, but someone with more imagination than me might find a clever use for it, and implement it. The specification allows for it, and even though today's implementation doesn't do it, you still have to think about it.

这应该不足为奇。流API的设计中心是通过顺序执行在平等的基础上支持并行执行。当然,程序可以观察它是顺序执行还是并行执行。但API的设计是为了支持一种允许的编程风格。

This should not surprising. The design center of the streams API is to support parallel execution on an equal footing with sequential execution. Of course, it is possible for a program to observe whether it is being executed sequentially or in parallel. But the design of the API is to support a style of programming that allows either.

如果你正在写一个收藏家,你发现它是不可能的(或者不方便,或者难以编写关联组合器函数,导致您想要将流限制为顺序执行,这可能意味着您正朝着错误的方向前进。现在是时候退后一步,考虑以不同的方式解决问题。

If you're writing a collector and you find that it's impossible (or inconvenient, or difficult) to write an associative combiner function, leading you to want to restrict your stream to sequential execution, maybe this means you're heading in the wrong direction. It's time to step back a bit and think about approaching the problem a different way.

一种不需要关联组合函数的常见缩减式操作称为倍左。主要特点是折叠功能严格从左到右应用,一次进行一次。我不知道如何并行化左折叠。

A common reduction-style operation that doesn't require an associative combiner function is called fold-left. The main characteristic is that the fold function is applied strictly left-to-right, proceeding one at a time. I'm not aware of a way to parallelize fold-left.

当人们试图以我们一直在讨论的方式扭曲收藏家时,他们通常会寻找像折叠左边的东西。 Streams API没有对此操作的直接API支持,但它很容易编写。例如,假设您要使用此操作减少字符串列表:重复第一个字符串,然后追加第二个字符串。很容易证明此操作不是关联的:

When people try to contort collectors the way we've been talking about, they're usually looking for something like fold-left. The Streams API doesn't have direct API support for this operation, but it's pretty easy to write. For example, suppose you want to reduce a list of strings using this operation: repeat the first string and then append the second. It's pretty easy to demonstrate that this operation isn't associative:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

顺序运行,产生所需的输出:

Run sequentially, this produces the desired output:

aabaabcaabaabcdaabaabcaabaabcde

但是当并行运行时,它可能产生如下内容:

But when run in parallel, it might produce something like this:

aabaabccdde

由于它按顺序工作,我们可以通过调用 sequential()来强制执行此操作,并将其支持让组合器抛出异常。此外,供应商必须只调用一次。没有办法合并中间结果,所以如果供应商被召唤两次,我们就已经遇到了麻烦。但由于我们知道供应商仅在顺序模式下被调用一次,因此大多数人并不担心这一点。事实上,我已经看到有人写供应商,这些供应商会返回一些现有的对象而不是创建一个新的对象,这违反了供应商的合同。

Since it "works" sequentially, we could enforce this by calling sequential() and back this up by having the combiner throw an exception. In addition, the supplier must be called exactly once. There's no way to combine the intermediate results, so if the supplier is called twice, we're already in trouble. But since we "know" the supplier is called only once in sequential mode, most people don't worry about this. In fact, I've seen people write "suppliers" that return some existing object instead of creating a new one, in violation of the supplier contract.

在这个用途中 collect(),我们在破坏合同的三个函数中有两个。这不应该告诉我们以不同的方式做事吗?

In this use of the 3-arg form of collect(), we have two out of the three functions breaking their contracts. Shouldn't this be telling us to do things a different way?

这里的主要工作是由累加器函数完成的。为了实现折叠式缩减,我们可以使用 forEachOrdered()以严格的从左到右的顺序应用此函数。我们必须在之前和之后做一些设置和完成代码,但这没问题:

The main work here is being done by the accumulator function. To accomplish a fold-style reduction, we can apply this function in a strict left-to-right order using forEachOrdered(). We have to do a bit of setup and finishing code before and after, but that's no problem:

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

当然,这并行工作正常,但并行运行的性能优势可能会在某种程度上被否定的订购要求forEachOrdered()

Naturally, this works fine in parallel, though the performance benefits of running in parallel may be somewhat negated by the ordering requirements of forEachOrdered().

总之,如果你发现自己想要做一个可变的减少但是您缺少关联组合器功能,导致您将流限制为顺序执行,将问题重新设置为 fold-left 操作并使用 forEachRemaining()关于累加器函数。

In summary, if you find yourself wanting to do a mutable reduction but you're lacking an associative combiner function, leading you to restrict your stream to sequential execution, recast the problem as a fold-left operation and use forEachRemaining() on your accumulator function.

这篇关于收集器的组合器功能是否可以用于顺序流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆