与从迭代器创建的流的并行性 [英] Parallelism with Streams created from Iterators

查看:72
本文介绍了与从迭代器创建的流的并行性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用流进行实验时,我遇到了以下我不太了解的行为.我从迭代器创建了并行流,我注意到它似乎没有表现出并行性.在下面的示例中,我在控制台上打印了两个并行流的计数器,一个是从迭代器创建的,另一个是从列表创建的.从列表创建的流表现出我期望的行为,该行为将以非连续的顺序打印计数器,但是从迭代器创建的流以连续的顺序打印计数器.我是错误地从迭代器创建并行流的吗?

Experimenting with streams I ran into the following behavior which I don't quite understand. I created a parallel stream from an iterator and I noticed that it did not seem to be exhibiting parallelism. In the below example I've printed a counter to the console for two parallel streams, one created from an iterator and the other from a list. The stream created from the list exhibited the behavior I expected which was to print the counter in non-sequential order but the stream created from the iterator printed the counter in sequential order. Am I creating the parallel stream from an iterator incorrectly?

    private static int counter = 0;

public static void main(String[] args) {
    List<Integer> lstr = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
    Iterator<Integer> iter = lstr.iterator();

    System.out.println("Iterator Stream: ");
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.IMMUTABLE | Spliterator.CONCURRENT), true).forEach(i -> {
        System.out.print(counter + " ");
        counter++;
    });

    counter = 0;
    System.out.println("\nList Stream: ");
    lstr.parallelStream().forEach(i -> {
        System.out.print(counter + " ");
        counter++;
    });

}

推荐答案

当前的实现将尝试通过缓冲值并将其分派给多个线程来并行化由迭代器生成的流,但是只有在该流为足够长了.将您的列表增加到10000个元素,您应该会看到并行性.

The current implementation will try to parallelize a stream produced form an iterator by buffering up the values and dispatching them to several threads but it only kicks in if the stream is long enough. Increase your list to 10000 elements and you should see parallelism.

列表很大,如果您将元素收集到按线程分组的映射中,则可能更容易查看线程如何分配元素.将.forEach替换为.collect(Collectors.groupingBy(x -> Thread.currentThread().getName(), Collectors.counting()))

With a large list, it might be easier to see how elements distributed by thread if you collect into a map grouped by thread. Replace your .forEach with .collect(Collectors.groupingBy(x -> Thread.currentThread().getName(), Collectors.counting()))

这篇关于与从迭代器创建的流的并行性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆