将Java流拆分为两个惰性流,无需终端操作 [英] Split Java stream into two lazy streams without terminal operation

查看:126
本文介绍了将Java流拆分为两个惰性流,无需终端操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

据我所知,通常Java流不会拆分。但是,我们有一个涉及且冗长的管道,最后我们有两种不同类型的处理,它们共享管道的第一部分。

I understand that in general Java streams do not split. However, we have an involved and lengthy pipeline, at the end of which we have two different types of processing that share the first part of the pipeline.

由于尺寸大小对于数据,存储中间流产品不是可行的解决方案。两次都没有运行管道。

Due to the size of the data, storing the intermediate stream product is not a viable solution. Neither is running the pipeline twice.

基本上,我们正在寻找的是一个解决方案,它是一个流上的操作,产生两个(或更多)懒惰的流填充并能够并行消耗。我的意思是,如果流A被分成流B和C,当流B和C消耗10个元素时,流A消耗并提供这10个元素,但是如果流B然后尝试消耗更多元素,则它将阻塞直到流C也消耗它们。

Basically, what we are looking for is a solution that is an operation on a stream that yields two (or more) streams that are lazily filled and able to be consumed in parallel. By that, I mean that if stream A is split into streams B and C, when streams B and C consume 10 elements, stream A consumes and provides those 10 elements, but if stream B then tries to consume more elements, it blocks until stream C also consumes them.

是否有针对此问题的预制解决方案或我们可以查看的任何库?如果没有,我们将在哪里开始研究是否要自己实施?或者是否有令人信服的理由不实施?

Is there any pre-made solution for this problem or any library we can look at? If not, where would we start to look if we want to implement this ourselves? Or is there a compelling reason not to implemented at all?

推荐答案

您可以实现自定义 Spliterator 以实现此类行为。我们将您的流分为常见的来源和不同的消费者。然后,自定义分裂器将元素从源转发给每个使用者。为此,我们将使用 BlockingQueue (请参阅此问题 )。

You can implement a custom Spliterator in order to achieve such behavior. We will split your streams into the common "source" and the different "consumers". The custom spliterator then forwards the elements from the source to each consumer. For this purpose, we will use a BlockingQueue (see this question).

请注意,这里的困难部分不是分裂器/流,而是队列中消费者的同步,因为您的问题的评论已经表明。但是,无论你实现同步, Spliterator 都有助于使用它。

Note that the difficult part here is not the spliterator/stream, but the syncing of the consumers around the queue, as the comments on your question already indicate. Still, however you implement the syncing, Spliterator helps to use streams with it.

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

private static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>   sourceSpliterator;

    private BlockingQueue<T> queue      = new LinkedBlockingQueue<>();

    private AtomicInteger    nextToTake = new AtomicInteger(0);
    private AtomicInteger    processed  = new AtomicInteger(0);

    private boolean          sourceDone;
    private int              consumerCount;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();
        consumerCount = consumers.length;

        for (int i = 0; i < consumers.length; i++)
        {
            int index = i;
            Consumer<Stream<T>> consumer = consumers[i];
            new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
                }
            }).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private int index;

        private ForkedConsumer(int index)
        {
            super(Long.MAX_VALUE, 0);

            this.index = index;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            // take next element when it's our turn
            while (!nextToTake.compareAndSet(index, index + 1))
            {
            }
            T element;
            while ((element = queue.peek()) == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(element);

            if (consumerCount == processed.incrementAndGet())
            {
                // start next round
                queue.poll();
                processed.set(0);
                nextToTake.set(0);
            }

            return true;
        }
    }
}

使用这种方法,消费者并行处理每个元素,但在开始下一个元素之前等待彼此。

With the approach used, the consumers work on each element in parallel, but wait for each other before starting on the next element.

已知问题
如果有消费者比其他消费者更短(例如,因为它调用 limit()),它也会阻止其他消费者并让线程悬空。

Known issue If one of the consumers is "shorter" than the others (e.g. because it calls limit()) it will also stop the other consumers and leave the threads hanging.

示例

public static void sleep(long millis)
{
    try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}

streamForked(Stream.of("1", "2", "3", "4", "5"),
             source -> source.map(word -> { sleep(50); return "fast   " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(300); return "slow      " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(50); return "2fast        " + word; }).forEach(System.out::println));

fast   1
2fast        1
slow      1
fast   2
2fast        2
slow      2
2fast        3
fast   3
slow      3
fast   4
2fast        4
slow      4
2fast        5
fast   5
slow      5

这篇关于将Java流拆分为两个惰性流,无需终端操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆