缓冲背景InputStream实现 [英] Buffered Background InputStream Implementations

查看:199
本文介绍了缓冲背景InputStream实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经写了背景 InputStream (和 OutputStream )包装其他流,后台线程,主要允许解压缩/压缩发生在解压缩流的处理的不同线程中。



这是一个相当标准的生产者/消费者模型。

这似乎是一个简单的方法,利用读取,处理和写入数据的简单进程来充分利用多核CPU,从而更有效地利用CPU和磁盘资源。也许高效不是最好的词,但它提供更高的利用率,并且对我更感兴趣,减少运行时,相比直接从 ZipInputStream 直接阅读,直接写到 ZipOutputStream



我很高兴发布代码,但我的问题是,



编辑 - 发布代码... / p>

我的代码为 BackgroundInputStream 如下( BackgroundOutputStream 是非常相似的),但有一些方面我想改进。


  1. 看起来我工作太 c> c> c> c>

  2. c> c>
  3. 例外应该传播到前台线程。

  4. 我想允许从提供的 Executor 使用线程。

  5. close()方法应该发信号通知后台主题,并且不应关闭包裹的流,









  6.   package nz.co.datacute.io; 

    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Arrays;
    import java.util.concurrent.LinkedBlockingQueue;

    public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue< byte []> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte [] currentBuffer;
    private volatile byte [] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream){
    this(wrappedInputStream,DEFAULT_QUEUE_SIZE,DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize){
    this.wrappedInputStream = wrappedInputStream;
    this.queueSize = queueSize;
    this.bufferSize = bufferSize;
    }

    @Override
    public int read()throws IOException {
    if(bufferQueue == null){
    bufferQueue = new LinkedBlockingQueue& ]>(queueSize);
    BackgroundReader backgroundReader = new BackgroundReader();
    线程backgroundReaderThread =新线程(backgroundReader,Background InputStream);
    backgroundReaderThread.start();
    }
    if(currentBuffer == null){
    try {
    if((!eof)||(bufferQueue.size()> 0)){
    currentBuffer = bufferQueue.take();
    pos = 0;
    } else {
    return -1;
    }
    } catch(InterruptedException e){
    e.printStackTrace();
    }
    }
    int b = currentBuffer [pos ++];
    if(pos == currentBuffer.length){
    freeBuffer = currentBuffer;
    currentBuffer = null;
    }
    return b;
    }

    @Override
    public int available()throws IOException {
    if(currentBuffer == null)return 0;
    return currentBuffer.length;
    }

    @Override
    public void close()throws IOException {
    wrappedInputStream.close();
    currentBuffer = null;
    freeBuffer = null;
    }

    类BackgroundReader实现Runnable {

    @Override
    public void run(){
    try {
    while !eof){
    byte [] newBuffer;
    if(freeBuffer!= null){
    newBuffer = freeBuffer;
    freeBuffer = null;
    } else {
    newBuffer = new byte [bufferSize];
    }
    int bytesRead = 0;
    int writtenToBuffer = 0;
    while(((bytesRead = wrappedInputStream.read(newBuffer,writtenToBuffer,bufferSize - writtenToBuffer))!= -1)&&(writtenToBuffer< bufferSize)){
    writtenToBuffer + = bytesRead;
    }
    if(writtenToBuffer> 0){
    if(writtenToBuffer newBuffer = Arrays.copyOf(newBuffer,writtenToBuffer);
    }
    bufferQueue.put(newBuffer);
    }
    if(bytesRead == -1){
    eof = true;
    }
    }
    } catch(IOException e){
    e.printStackTrace();
    } catch(InterruptedException e){
    e.printStackTrace();
    }
    }

    }
    }


    解决方案

    听起来很有趣。我从来没有经历过任何这样做的开箱即用,但它是非常有意义的尝试和使用一个空闲的核心压缩,如果可用。



    也许你可以使用 Commons I / O - 它是一个经过良好测试的lib,可以帮助处理一些更无聊的东西,让你专注于扩展酷平行部分。也许你甚至可以贡献你的代码到Commons项目; - )


    I've written background InputStream (and OutputStream) implementations that wrap other streams, and read ahead on a background thread, primarily allowing for decompression/compression to happen in different threads from the processing of the decompressed stream.

    It's a fairly standard producer/consumer model.

    This seems like an easy way to make good use of multi-core CPUs with simple processes that read, process, and write data, allowing for more efficient use of both CPU and disk resources. Perhaps 'efficient' isn't the best word, but it provides higher utilisation, and of more interest to me, reduced runtimes, compared to reading directly from a ZipInputStream and writing directly to a ZipOutputStream.

    I'm happy to post the code, but my question is whether I'm reinventing something readily available in existing (and more heavily exercised) libraries?

    Edit - posting code...

    My code for the BackgroundInputStream is below (the BackgroundOutputStream is very similar), but there are aspects of it that I'd like to improve.

    1. It looks like I'm working far too hard to pass buffers back and forward.
    2. If the calling code throws away references to the BackgroundInputStream, the backgroundReaderThread will hang around forever.
    3. Signalling eof needs improving.
    4. Exceptions should be propagated to the foreground thread.
    5. I'd like to allow using a thread from a provided Executor.
    6. The close() method should signal the background thread, and shouldn't close the wrapped stream, as the wrapped stream should be owned by the background thread that reads from it.
    7. Doing silly things like reading after closing should be catered for appropriately.


    package nz.co.datacute.io;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Arrays;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class BackgroundInputStream extends InputStream {
        private static final int DEFAULT_QUEUE_SIZE = 1;
        private static final int DEFAULT_BUFFER_SIZE = 64*1024;
        private final int queueSize;
        private final int bufferSize;
        private volatile boolean eof = false;
        private LinkedBlockingQueue<byte[]> bufferQueue;
        private final InputStream wrappedInputStream;
        private byte[] currentBuffer;
        private volatile byte[] freeBuffer;
        private int pos;
    
        public BackgroundInputStream(InputStream wrappedInputStream) {
            this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
        }
    
        public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
            this.wrappedInputStream = wrappedInputStream;
            this.queueSize = queueSize;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public int read() throws IOException {
            if (bufferQueue == null) {
                bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
                BackgroundReader backgroundReader = new BackgroundReader();
                Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
                backgroundReaderThread.start();
            }
            if (currentBuffer == null) {
                try {
                    if ((!eof) || (bufferQueue.size() > 0)) {
                        currentBuffer = bufferQueue.take();
                        pos = 0;
                    } else {
                        return -1;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int b = currentBuffer[pos++];
            if (pos == currentBuffer.length) {
                freeBuffer = currentBuffer;
                currentBuffer = null;
            }
            return b;
        }
    
        @Override
        public int available() throws IOException {
            if (currentBuffer == null) return 0;
            return currentBuffer.length;
        }
    
        @Override
        public void close() throws IOException {
            wrappedInputStream.close();
            currentBuffer = null;
            freeBuffer = null;
        }
    
        class BackgroundReader implements Runnable {
    
            @Override
            public void run() {
                try {
                    while (!eof) {
                        byte[] newBuffer;
                        if (freeBuffer != null) {
                            newBuffer = freeBuffer;
                            freeBuffer = null;
                        } else {
                            newBuffer = new byte[bufferSize];
                        }
                        int bytesRead = 0;
                        int writtenToBuffer = 0;
                        while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                            writtenToBuffer += bytesRead;
                        }
                        if (writtenToBuffer > 0) {
                            if (writtenToBuffer < bufferSize) {
                                newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                            }
                            bufferQueue.put(newBuffer);
                        }
                        if (bytesRead == -1) {
                            eof = true;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    

    解决方案

    Sounds interesting. I've never run across anything that does this out of the box but it makes perfect sense to try and use an idle core for the compression if it's available.

    Perhaps you could make use of Commons I/O - it is a well tested lib which could help handle some of the more boring stuff and let you focus on extending the cool parallel parts. Maybe you could even contribute your code to the Commons project ;-)

    这篇关于缓冲背景InputStream实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆