如何将非并行流与并行流互连(一个生产者多个消费者) [英] How to interconnect non-parallel stream with parallel stream(one producer multiple consumers)

查看:71
本文介绍了如何将非并行流与并行流互连(一个生产者多个消费者)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Java8中的流创建一个生成器多个使用者模型。我正在从DB资源中读取和处理数据,
我想以流方式处理它们(无法将整个资源读入内存)。

I am trying to create one producer multiple consumers model with streams in Java8. I am reading and processing data from DB resource and I want to process them in streaming fashion way(can not read the whole resource into memory).

读取源必须是单线程的(游标不是线程安全的)并且读取速度快,而且每个数据块的处理都可以并行运行。

The reading of the source has to be single threaded (the cursors is not thread safe) and reading is fast, than the processing of each data chunks which is heavy operation can run in parallel.

我还没有发现如何通过并行流处理加入(互连)非并行流。有没有办法如何使用Java8流API?

I haven't found out how can I join (interconnect) non-parallel stream with parallel stream processing. Is there any way how to do it with Java8 stream API ?

代码示例:

这个迭代器有因为游标不是线程安全的,所以在单线程中运行。

This iterator has to run in single thread because cursor is not thread safe.

class SimpleIterator<Data> implements Iterator<Data>{

    private volatile Cursor cursor;

    public SimpleIterator(Cursor cursor){
        this.cursor = cursor;
    }

   @Override
    public boolean hasNext() {
        return cursor.hasNext();
    }    

    @Override
    public Data next() {
     return cursor.next();

    }
}

//创建非参数stream

//create the non-paralel stream

SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false

//每个数据的处理数据应并行运行

//process data for each data should run in parallel

resultStream.parallel().forEach(data->processData(data)); 
public processData(Data data){
//heavy operation
}

但是如果我在调用forEach之前将stream设置为parallel并且整个流是并行的,并且迭代器也在多个线程中调用。
有没有办法如何在Java8中互连这两个流,或者我必须创建一些队列,从单线程生成器流到并行流提供数据。

But if I set stream as parallel before calling forEach than the whole stream is parallel and also the iterator is calling in multiple threads. Is there any way how to interconnect this two streams in Java8 or I have to create some queue that will provide data from single threaded producer stream to parallel stream.

推荐答案

我正在解决一个需要在两个流上进行完全外连接的问题。问题似乎很相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以使用一个阻塞队列执行类似操作,将单个流拆分为多个流,而无需并行化源流。

I am working on a problem where I need to do a full outer join on two streams. The problems appear to be similar. What I do is insert two blocking queues to buffer my input. I think you can do something similar with one blocking queue to split a single stream into multiple streams without parallelizing the source stream.

我建议的解决方案可以在下面找到。我还没有测试我加入两个流的解决方案,所以我不确定这是否有效。 AbstractSpliterator类有一个trySplit的实现;关于trySplit的评论是有用的。该类的最终方法构造了一个来自spliterator实现的可并行化的流。

The solution I propose can be found below. I have not tested my solution for joining two streams yet, so I am not certain this works. The AbstractSpliterator class has an implementation of trySplit; the comments on trySplit are informative. The final method of the class constructs a parallelizable stream from the spliterator implementation.

import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
    final T EOS = null; // Just a stub -- can't put a null in BlockingQueue

    private final BlockingQueue<T> queue;
    private final Thread thread;

    // An implementation of Runnable that fills a queue from a stream
    private class Filler implements Runnable {
        private final Stream<T> stream;
        private final BlockingQueue<T> queue;

        private Filler(Stream<T> stream, BlockingQueue<T> queue) {
            this.stream = stream;
            this.queue = queue;
        }

        @Override
        public void run() {
            stream.forEach(x -> {
                try {
                    // Blocks if the queue is full
                    queue.put(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            // Stream is drained put end of stream marker.
            try {
                queue.put(EOS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
        super(estSize, characteristics);
        queue = new ArrayBlockingQueue<T>(1024);
        // Fill the queue from a separate thread (may want to externalize this).
        thread = new Thread(new Filler(srcStream, queue));
        thread.start();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            T value = queue.take(); // waits (blocks) for entries in queue

            // If end of stream marker is found, return false signifying
            // that the stream is finished.
            if (value == EOS) {
                return false;
            }
            // Accept the next value.
            action.accept(value);
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
        Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
        return StreamSupport.stream(spliterator, true);
    }
}

这篇关于如何将非并行流与并行流互连(一个生产者多个消费者)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆