在并行流上调用顺序会使所有先前的操作顺序进行 [英] Calling sequential on parallel stream makes all previous operations sequential

查看:156
本文介绍了在并行流上调用顺序会使所有先前的操作顺序进行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组重要的数据,并希望调用缓慢但干净的方法,而不是在第一个方法的结果上调用带有副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。

I've got a significant set of data, and want to call slow, but clean method and than call fast method with side effects on result of the first one. I'm not interested in intermediate results, so i would like not to collect them.

明显的解决方案是创建并行流,慢速调用,再次生成流顺序,并拨打快速电话。问题是,所有代码都在单线程中执行,没有实际的并行性。

Obvious solution is to create parallel stream, make slow call , make stream sequential again, and make fast call. The problem is, ALL code executing in single thread, there is no actual parallelism.

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}

如果我删除顺序,代码按预期执行,但显然,非并行操作将在多个线程中调用。

If I remove sequential, code executing as expected, but, obviously, non-parallel operation would be call in multiple threads.

你能推荐一些关于这种行为的引用,或者某种方式避免临时收集?

Could you recommend some references about such behavior, or maybe some way to avoid temporary collections?

推荐答案

parallel()切换流 sequential()在最初的Stream API设计中工作,但是引起了很多问题,最后实现了已更改,因此它只是打开和关闭并行标志整个管道。目前的文档确实含糊不清,但在 Java-9

Switching the stream from parallel() to sequential() worked in the initial Stream API design, but caused many problems and finally the implementation was changed, so it just turns the parallel flag on and off for the whole pipeline. The current documentation is indeed vague, but it was improved in Java-9:


流管道按顺序或并行执行,具体取决于模式调用终端操作的流。可以使用 BaseStream.isParallel()方法确定流的顺序或并行模式,并且可以使用 BaseStream修改流的模式。 sequential() BaseStream.parallel()操作。最近的顺序或并行模式设置适用于整个流管道的执行。

The stream pipeline is executed sequentially or in parallel depending on the mode of the stream on which the terminal operation is invoked. The sequential or parallel mode of a stream can be determined with the BaseStream.isParallel() method, and the stream's mode can be modified with the BaseStream.sequential() and BaseStream.parallel() operations. The most recent sequential or parallel mode setting applies to the execution of the entire stream pipeline.

至于你的问题,你可以收集所有东西进入中间列表并启动新的顺序管道:

As for your problem, you can collect everything into intermediate List and start new sequential pipeline:

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());

这篇关于在并行流上调用顺序会使所有先前的操作顺序进行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆