Android的PipedInputStream的食堂了数据 [英] Android PipedInputStream messes up data

查看:175
本文介绍了Android的PipedInputStream的食堂了数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我偶然到了Android的PipedInput / OutputStreams。

Recently I've stumbled into an issue with Android's PipedInput/OutputStreams.

我以前使用过他们,他们都为我好,但是诠释以下情形
如预期它没有发挥出来。

I have used them before and they have served me well, however int the following scenario it did not play out as expected.

我有一个类德codeFeed'与方法
- 读(字节[],INT长度):: INT
- 写(字节[],INT长度):: INT
- getProcessedStream():: InputStream的

I have a class 'DecodeFeed' with methods - read(byte[], int length) :: int - write(byte[], int length) :: int - getProcessedStream() :: InputStream

这个类的一个实例可以被传递给JNI函数,它可以任意调用
在读和写的方法来执行一些本地计算和写回。
如果我输出在Java的层从德codeFeed.write内的数据,结果,
如预期。为贯彻落实getProcessedStream()',我创建了一个PipedInputSteamRET的
被连接到的PipedOutputStream'管'。然后,写的方法调用pipe.write(缓冲)。
另一个线程现在调用'getProcessedStream()',并尝试读取它,你会读
一个正常的InputStream。

An instance of this class may be passed to a jni function, which may arbitrarily call the 'read' and 'write' methods to perform some local computation and then write back. If I output the data at the Java-Layer from within DecodeFeed.write, the results are, as expected. To implement 'getProcessedStream()', I created a PipedInputSteam 'ret' that is connected to a PipedOutputStream 'pipe'. The 'write' method then calls pipe.write(buffer). Another thread now calls 'getProcessedStream()' and tries to read it, as you would read a normal InputStream.

然而该数据被破坏。读出线程使用读取(缓冲液):: INT,其中该缓冲器大小= 1024。但是很多时候,该方法返回1作为读出的字节数。这似乎不太可能,因为德codeFeed'的'写'方法通常每次调用时写道:像4096字节。

However the data is broken. The reading thread uses read(buffer)::int, where the buffer size = 1024. However many a times, the method returns '1' as the number of bytes read. This seems unlikely, given that the 'write' method of 'DecodeFeed' usually writes something like 4096 bytes, every time it is called.

不管我多么涉足与包裹管道流进缓冲流,得到的数据
搞砸了,当通过管道发送。

No matter how much I dabbled with wrapping the piped stream into buffered streams, the data got messed up, when sent through the pipe.

我很想听到,如果有人有类似的问题,如果是这样,你是如何解决它们。
我会提供我自己的解决方案如下回答。

I would be interested to hear, if someone had similar problems and if so, how you fixed them. I will provide my own solution as an answer below.

推荐答案

要克服上述问题,我实现了我使用了Buffer类
带班的iostream子类的InputStream'结合使用。这种方法工作
对我来说,让我分享我的解决方案。不过,我认为PipedInput / OutputStreams应该表现方式基本相同。也许有一些疑难杂症,我没有得到,如果没有,我会认为这是一个严重的问题。

To overcome the issues indicated above, I've implemented a Buffer class that I'm using in conjunction with a class 'IOStream' that subclasses 'InputStream'. This approach has worked for me, so I share my solution. However I think that the PipedInput/OutputStreams should behave basically the same way. Maybe there is some gotcha that I didn't get, if not, I would consider this a serious issue.

package com.pack.io;

import java.io.IOException;
import java.io.InputStream;

public class IOStream extends InputStream {

public static final String TAG = "IOStream";

private Buffer local_buffer = null;


int write_pos = 0;

public IOStream() throws IOException {
    local_buffer = new Buffer();
}

/**
 * 
 * @param size_in_kb defaults to 16 with default constructor
 */
public IOStream(final int size_in_kb) {
    local_buffer = new Buffer(size_in_kb);
}

@Override
public void close() throws IOException{
    local_buffer.close();
}

@Override
public int read() throws IOException {
    return local_buffer.read();
}

@Override
public int read(byte[] buffer) throws IOException {
    return local_buffer.read(buffer);
}


@Override
public int read(byte[] buffer, int offset, int bytes) throws IOException {
    return local_buffer.read(buffer, offset, bytes);
}

public int write(byte[] buffer) throws IOException {
    return local_buffer.write(buffer);
}

public int write(final byte[] buffer, final int offset, final int bytes) throws IOException {
    return local_buffer.write(buffer, offset, bytes);
}

}


package com.pack.io;

public class Mutex {
private int mx_nb_resources = 1;
private int nb_resources = 0;
private final Object _mutex = new Object();

/**
 * Construct a mutex, where at max mx_nb_resources
 * are allowed access
 * @param mx_nb_resources defaults to 1 (using the default constructor)
 */
public Mutex(int mx_nb_resources) {
    this.mx_nb_resources = mx_nb_resources;
}

public Mutex() {}

public void lock() {
    synchronized (_mutex) {
        nb_resources++;
        if (nb_resources > mx_nb_resources) {
            try {
                _mutex.wait();
            } catch (InterruptedException e) {}
        }
    }
}

public void unlock() {
    synchronized (_mutex) {
        nb_resources--;
        _mutex.notify();
    }
}
}

package com.pack.io;

import java.io.IOException;

public class Buffer {

public static final String TAG = "Buffer";

private Object _mutex = new Object();
private Mutex _read_mutex = new Mutex();
private Mutex _write_mutex = new Mutex();
private byte[][] local_buffer = null;
private int local_buffer_size = 0;
private int mx_local_bufer_size = 0;
private int chunk_size = 1024;
private int nb_chunks = 16;

int write_pos = 0;
private volatile boolean done = false;

/**
 * Threadable Buffer class that may be written to and read from
 * - buffer size defaults to 16kb
 * - read-methods block, if not enough data is available
 * - write methods block, if buffer is full
 * - flush is a noop
 * - close will cause future read/writes to be noops, returning -1 
 */
public Buffer() {
    init();
}

/**
 * Default constructor sets buffersize to 16 kb
 * @param size_in_kb set buffersize as desired
 */
public Buffer(final int size_in_kb) {
    this.nb_chunks = size_in_kb;
    init();
}

private void init() {
    local_buffer = new byte[nb_chunks][chunk_size];
    mx_local_bufer_size = nb_chunks * chunk_size;
}

public void flush() {
    // Can't really do much here. In case a read/write thread is
    // waiting, it will be poked as soon as possible anyways.
    // It could be argued, that flush() should cause the 
    // cache to be emptied before accepting new writes,
    // however this seems impractical and unnecessary.
    // For what it's worth, this method is provided for the
    // case, where an subclass of Buffer wants to use it. 
}

public void close() throws IOException{
    synchronized (_mutex) {
        done = true;
        _mutex.notifyAll();
    }
}

public int read() throws IOException {
    byte[] buffer = new byte[1];
    read(buffer);
    return buffer[0];
}

public int read(byte[] buffer) throws IOException {
    return read(buffer, 0, buffer.length);
}

public int read(byte[] buffer, int offset, int bytes) throws IOException {
    synchronized (_mutex) {
        _read_mutex.lock();
        int bytes_read = 0;
        try {
            // block while not enough data is available
            while (local_buffer_size < bytes + offset && !done) {
                try {
                    _mutex.wait();
                } catch (InterruptedException e) {}
            }
            if (done) return -1;

            int pos = offset;

            for (int i = 0; i < bytes && pos < local_buffer_size; pos++, i++) {
                buffer[i] = local_buffer[pos / chunk_size][pos % chunk_size];
            }

            bytes_read = Math.min(bytes, pos - offset);

            // left shift
            for (int i = 0, j = bytes_read + offset; j < local_buffer_size; i++, j++) {
                local_buffer[i / chunk_size][i % chunk_size] = 
                        local_buffer[j / chunk_size][j % chunk_size];
            }

            return bytes_read;
        } finally {
            local_buffer_size -= (bytes_read);
            write_pos -= (bytes_read + offset);
            _mutex.notify(); // write() might be waiting until more space is available, so notify
            _read_mutex.unlock();
        }
    }
}

public int write(final byte[] buffer) throws IOException {
    return write(buffer, 0, buffer.length);
}

public int write(final byte[] buffer, final int offset, final int bytes) throws IOException {
    synchronized (_mutex) {
        _write_mutex.lock();
        try {
            // block while not enough room in the local_buffer
            while (local_buffer_size + bytes + offset > mx_local_bufer_size) {
                try {
                    _mutex.wait();
                } catch (InterruptedException e) {}
            }
            if (done) return -1;

            write_pos += offset;
            for (int i = 0; i < bytes; write_pos++, i++) {
                local_buffer[write_pos / chunk_size][write_pos % chunk_size]
                        = buffer[i];
            }

            return bytes;
        } finally {
            local_buffer_size += bytes;
            _mutex.notify(); // read() might be waiting for input, so notify
            _write_mutex.unlock();
        }
    }   
}
}

这篇关于Android的PipedInputStream的食堂了数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆