缓冲背景InputStream实现 [英] Buffered Background InputStream Implementations
问题描述
我已经写了背景 InputStream
(和 OutputStream
)包装其他流,后台线程,主要允许解压缩/压缩发生在解压缩流的处理的不同线程中。
这是一个相当标准的生产者/消费者模型。
这似乎是一个简单的方法,利用读取,处理和写入数据的简单进程来充分利用多核CPU,从而更有效地利用CPU和磁盘资源。也许高效不是最好的词,但它提供更高的利用率,并且对我更感兴趣,减少运行时,相比直接从 ZipInputStream
直接阅读,直接写到 ZipOutputStream
。
我很高兴发布代码,但我的问题是,
编辑 - 发布代码... / p>
我的代码为 BackgroundInputStream
如下( BackgroundOutputStream
是非常相似的),但有一些方面我想改进。
- 看起来我工作太 c> c> c> c>
- c> c>
- 例外应该传播到前台线程。
- 我想允许从提供的
Executor
使用线程。 -
close()
方法应该发信号通知后台主题,并且不应关闭包裹的流, -
package nz.co.datacute.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof = false;
private LinkedBlockingQueue< byte []> bufferQueue;
private final InputStream wrappedInputStream;
private byte [] currentBuffer;
private volatile byte [] freeBuffer;
private int pos;
public BackgroundInputStream(InputStream wrappedInputStream){
this(wrappedInputStream,DEFAULT_QUEUE_SIZE,DEFAULT_BUFFER_SIZE);
}
public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize){
this.wrappedInputStream = wrappedInputStream;
this.queueSize = queueSize;
this.bufferSize = bufferSize;
}
@Override
public int read()throws IOException {
if(bufferQueue == null){
bufferQueue = new LinkedBlockingQueue& ]>(queueSize);
BackgroundReader backgroundReader = new BackgroundReader();
线程backgroundReaderThread =新线程(backgroundReader,Background InputStream);
backgroundReaderThread.start();
}
if(currentBuffer == null){
try {
if((!eof)||(bufferQueue.size()> 0)){
currentBuffer = bufferQueue.take();
pos = 0;
} else {
return -1;
}
} catch(InterruptedException e){
e.printStackTrace();
}
}
int b = currentBuffer [pos ++];
if(pos == currentBuffer.length){
freeBuffer = currentBuffer;
currentBuffer = null;
}
return b;
}
@Override
public int available()throws IOException {
if(currentBuffer == null)return 0;
return currentBuffer.length;
}
@Override
public void close()throws IOException {
wrappedInputStream.close();
currentBuffer = null;
freeBuffer = null;
}
类BackgroundReader实现Runnable {
@Override
public void run(){
try {
while !eof){
byte [] newBuffer;
if(freeBuffer!= null){
newBuffer = freeBuffer;
freeBuffer = null;
} else {
newBuffer = new byte [bufferSize];
}
int bytesRead = 0;
int writtenToBuffer = 0;
while(((bytesRead = wrappedInputStream.read(newBuffer,writtenToBuffer,bufferSize - writtenToBuffer))!= -1)&&(writtenToBuffer< bufferSize)){
writtenToBuffer + = bytesRead;
}
if(writtenToBuffer> 0){
if(writtenToBuffernewBuffer = Arrays.copyOf(newBuffer,writtenToBuffer);
}
bufferQueue.put(newBuffer);
}
if(bytesRead == -1){
eof = true;
}
}
} catch(IOException e){
e.printStackTrace();
} catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
解决方案听起来很有趣。我从来没有经历过任何这样做的开箱即用,但它是非常有意义的尝试和使用一个空闲的核心压缩,如果可用。
也许你可以使用 Commons I / O - 它是一个经过良好测试的lib,可以帮助处理一些更无聊的东西,让你专注于扩展酷平行部分。也许你甚至可以贡献你的代码到Commons项目; - )
I've written background
InputStream
(andOutputStream
) implementations that wrap other streams, and read ahead on a background thread, primarily allowing for decompression/compression to happen in different threads from the processing of the decompressed stream.It's a fairly standard producer/consumer model.
This seems like an easy way to make good use of multi-core CPUs with simple processes that read, process, and write data, allowing for more efficient use of both CPU and disk resources. Perhaps 'efficient' isn't the best word, but it provides higher utilisation, and of more interest to me, reduced runtimes, compared to reading directly from a
ZipInputStream
and writing directly to aZipOutputStream
.I'm happy to post the code, but my question is whether I'm reinventing something readily available in existing (and more heavily exercised) libraries?
Edit - posting code...
My code for the
BackgroundInputStream
is below (theBackgroundOutputStream
is very similar), but there are aspects of it that I'd like to improve.- It looks like I'm working far too hard to pass buffers back and forward.
- If the calling code throws away references to the
BackgroundInputStream
, thebackgroundReaderThread
will hang around forever. - Signalling
eof
needs improving. - Exceptions should be propagated to the foreground thread.
- I'd like to allow using a thread from a provided
Executor
. - The
close()
method should signal the background thread, and shouldn't close the wrapped stream, as the wrapped stream should be owned by the background thread that reads from it. - Doing silly things like reading after closing should be catered for appropriately.
package nz.co.datacute.io; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.LinkedBlockingQueue; public class BackgroundInputStream extends InputStream { private static final int DEFAULT_QUEUE_SIZE = 1; private static final int DEFAULT_BUFFER_SIZE = 64*1024; private final int queueSize; private final int bufferSize; private volatile boolean eof = false; private LinkedBlockingQueue<byte[]> bufferQueue; private final InputStream wrappedInputStream; private byte[] currentBuffer; private volatile byte[] freeBuffer; private int pos; public BackgroundInputStream(InputStream wrappedInputStream) { this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE); } public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) { this.wrappedInputStream = wrappedInputStream; this.queueSize = queueSize; this.bufferSize = bufferSize; } @Override public int read() throws IOException { if (bufferQueue == null) { bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize); BackgroundReader backgroundReader = new BackgroundReader(); Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream"); backgroundReaderThread.start(); } if (currentBuffer == null) { try { if ((!eof) || (bufferQueue.size() > 0)) { currentBuffer = bufferQueue.take(); pos = 0; } else { return -1; } } catch (InterruptedException e) { e.printStackTrace(); } } int b = currentBuffer[pos++]; if (pos == currentBuffer.length) { freeBuffer = currentBuffer; currentBuffer = null; } return b; } @Override public int available() throws IOException { if (currentBuffer == null) return 0; return currentBuffer.length; } @Override public void close() throws IOException { wrappedInputStream.close(); currentBuffer = null; freeBuffer = null; } class BackgroundReader implements Runnable { @Override public void run() { try { while (!eof) { byte[] newBuffer; if (freeBuffer != null) { newBuffer = freeBuffer; freeBuffer = null; } else { newBuffer = new byte[bufferSize]; } int bytesRead = 0; int writtenToBuffer = 0; while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) { writtenToBuffer += bytesRead; } if (writtenToBuffer > 0) { if (writtenToBuffer < bufferSize) { newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer); } bufferQueue.put(newBuffer); } if (bytesRead == -1) { eof = true; } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
解决方案Sounds interesting. I've never run across anything that does this out of the box but it makes perfect sense to try and use an idle core for the compression if it's available.
Perhaps you could make use of Commons I/O - it is a well tested lib which could help handle some of the more boring stuff and let you focus on extending the cool parallel parts. Maybe you could even contribute your code to the Commons project ;-)
这篇关于缓冲背景InputStream实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!