将Java流拆分为两个惰性流,无需终端操作 [英] Split Java stream into two lazy streams without terminal operation
问题描述
据我所知,通常Java流不会拆分。但是,我们有一个涉及且冗长的管道,最后我们有两种不同类型的处理,它们共享管道的第一部分。
I understand that in general Java streams do not split. However, we have an involved and lengthy pipeline, at the end of which we have two different types of processing that share the first part of the pipeline.
由于尺寸大小对于数据,存储中间流产品不是可行的解决方案。两次都没有运行管道。
Due to the size of the data, storing the intermediate stream product is not a viable solution. Neither is running the pipeline twice.
基本上,我们正在寻找的是一个解决方案,它是一个流上的操作,产生两个(或更多)懒惰的流填充并能够并行消耗。我的意思是,如果流A被分成流B和C,当流B和C消耗10个元素时,流A消耗并提供这10个元素,但是如果流B然后尝试消耗更多元素,则它将阻塞直到流C也消耗它们。
Basically, what we are looking for is a solution that is an operation on a stream that yields two (or more) streams that are lazily filled and able to be consumed in parallel. By that, I mean that if stream A is split into streams B and C, when streams B and C consume 10 elements, stream A consumes and provides those 10 elements, but if stream B then tries to consume more elements, it blocks until stream C also consumes them.
是否有针对此问题的预制解决方案或我们可以查看的任何库?如果没有,我们将在哪里开始研究是否要自己实施?或者是否有令人信服的理由不实施?
Is there any pre-made solution for this problem or any library we can look at? If not, where would we start to look if we want to implement this ourselves? Or is there a compelling reason not to implemented at all?
推荐答案
您可以实现自定义 Spliterator
以实现此类行为。我们将您的流分为常见的来源和不同的消费者。然后,自定义分裂器将元素从源转发给每个使用者。为此,我们将使用 BlockingQueue
(请参阅此问题 )。
You can implement a custom Spliterator
in order to achieve such behavior. We will split your streams into the common "source" and the different "consumers". The custom spliterator then forwards the elements from the source to each consumer. For this purpose, we will use a BlockingQueue
(see this question).
请注意,这里的困难部分不是分裂器/流,而是队列中消费者的同步,因为您的问题的评论已经表明。但是,无论你实现同步, Spliterator
都有助于使用它。
Note that the difficult part here is not the spliterator/stream, but the syncing of the consumers around the queue, as the comments on your question already indicate. Still, however you implement the syncing, Spliterator
helps to use streams with it.
@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}
private static class ForkingSpliterator<T>
extends AbstractSpliterator<T>
{
private Spliterator<T> sourceSpliterator;
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private AtomicInteger nextToTake = new AtomicInteger(0);
private AtomicInteger processed = new AtomicInteger(0);
private boolean sourceDone;
private int consumerCount;
@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
{
super(Long.MAX_VALUE, 0);
sourceSpliterator = source.spliterator();
consumerCount = consumers.length;
for (int i = 0; i < consumers.length; i++)
{
int index = i;
Consumer<Stream<T>> consumer = consumers[i];
new Thread(new Runnable()
{
@Override
public void run()
{
consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
}
}).start();
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
return !sourceDone;
}
private class ForkedConsumer
extends AbstractSpliterator<T>
{
private int index;
private ForkedConsumer(int index)
{
super(Long.MAX_VALUE, 0);
this.index = index;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
// take next element when it's our turn
while (!nextToTake.compareAndSet(index, index + 1))
{
}
T element;
while ((element = queue.peek()) == null)
{
if (sourceDone)
{
// element is null, and there won't be no more, so "terminate" this sub stream
return false;
}
}
// push to consumer pipeline
action.accept(element);
if (consumerCount == processed.incrementAndGet())
{
// start next round
queue.poll();
processed.set(0);
nextToTake.set(0);
}
return true;
}
}
}
使用这种方法,消费者并行处理每个元素,但在开始下一个元素之前等待彼此。
With the approach used, the consumers work on each element in parallel, but wait for each other before starting on the next element.
已知问题
如果有消费者比其他消费者更短(例如,因为它调用 limit()
),它也会阻止其他消费者并让线程悬空。
Known issue
If one of the consumers is "shorter" than the others (e.g. because it calls limit()
) it will also stop the other consumers and leave the threads hanging.
示例
public static void sleep(long millis)
{
try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}
streamForked(Stream.of("1", "2", "3", "4", "5"),
source -> source.map(word -> { sleep(50); return "fast " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(300); return "slow " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(50); return "2fast " + word; }).forEach(System.out::println));
fast 1
2fast 1
slow 1
fast 2
2fast 2
slow 2
2fast 3
fast 3
slow 3
fast 4
2fast 4
slow 4
2fast 5
fast 5
slow 5
这篇关于将Java流拆分为两个惰性流,无需终端操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!