从ByteArrayOutputStream而不是文件中读取实木复合地板数据 [英] Read parquet data from ByteArrayOutputStream instead of file

查看:240
本文介绍了从ByteArrayOutputStream而不是文件中读取实木复合地板数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想转换这段代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ParquetReaderUtils {

    public static Parquet getParquetData(String filePath) throws IOException {
        List<SimpleGroup> simpleGroups = new ArrayList<>();
        ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
        MessageType schema = reader.getFooter().getFileMetaData().getSchema();
        //List<Type> fields = schema.getFields();
        PageReadStore pages;
        while ((pages = reader.readNextRowGroup()) != null) {
            long rows = pages.getRowCount();
            MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
            RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

            for (int i = 0; i < rows; i++) {
                SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
                simpleGroups.add(simpleGroup);
            }
        }
        reader.close();
        return new Parquet(simpleGroups, schema);
    }
}

(来自> https://www.arm64.ca/post/reading-parquet-files-java/

获取ByteArrayOutputStream参数而不是filePath。

to take a ByteArrayOutputStream parameter instead of a filePath.

这可能吗?我在org.apache.parquet.hadoop中没有看到ParquetStreamReader。

Is this possible? I don't see a ParquetStreamReader in org.apache.parquet.hadoop.

任何帮助,我们感激不尽。我正在尝试为来自kafka的镶木地板编写一个测试应用程序,并将许多消息写入文件相当慢。

Any help is appreciated. I am trying to write a test app for parquet coming from kafka and writing each of many messages out to a file is rather slow.

推荐答案

因此,如果没有更深入的测试,我将尝试使用此类(尽管输出流的内容应该与拼花地板兼容)。我在其中放了一个streamId,以使已处理字节数组的标识更容易(如果出现问题,ParquetFileReader会打印出instance.toString()。)。

So without deeper testing, I would try with this class (albeit the content of the outputstream should be parquet-compatible). I put there a streamId to make the identification of the processed bytearray easier (the ParquetFileReader prints the instance.toString() out if something went wrong).

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

public class ParquetStream implements InputFile
{
  private final String streamId;
  private final byte[] data;

  public class SeekableByteArrayInputStream extends ByteArrayInputStream
  {
     public SeekableByteArrayInputStream(byte[] buf)
     {
        super(buf);
     }

     public void setPos(int pos)
     {
        this.pos = pos;
     }

     public int getPos()
     {
        return this.pos;
     }
 }

public ParquetStream(String streamId, ByteArrayOutputStream stream)
{
    this.streamId = streamId;
    this.data = stream.toByteArray();
}

@Override
public long getLength() throws IOException
{
    return this.data.length;
}

@Override
public SeekableInputStream newStream() throws IOException
{
    return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data))
    {

        @Override
        public void seek(long newPos) throws IOException
        {
            ((SeekableByteArrayInputStream) this.getStream()).setPos(new Long(newPos).intValue());
        }

        @Override
        public long getPos() throws IOException
        {
            return new Integer(((SeekableByteArrayInputStream) this.getStream()).getPos()).longValue();
        }
    };
}

@Override
public String toString()
{
    return new StringBuilder("ParquetStream[").append(streamId).append("]").toString();
}

}

这篇关于从ByteArrayOutputStream而不是文件中读取实木复合地板数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆