如何有效地读写Parquet文件? [英] How to read and write Parquet files efficiently?
问题描述
我正在研究一种实用程序,该实用程序可以一次读取多个镶木地板文件,并将它们写入一个输出文件中.实现非常简单.该实用程序从目录中读取实木复合地板文件,从所有文件中读取 Group
并将它们放入列表中,然后使用ParquetWrite将所有这些组写入单个文件中.
读取600mb后,它会抛出Java堆空间的内存不足错误.读取和写入500mb的数据也需要15-20分钟.
I am working on a utility which reads multiple parquet files at a time and writing them into one single output file.
the implementation is very straightforward. This utility reads parquet files from the directory, reads Group
from all the file and put them into a list .Then uses ParquetWrite to write all these Groups into a single file.
After reading 600mb it throws Out of memory error for Java heap space. It also takes 15-20 minutes to read and write 500mb of data.
有没有办法使此操作更有效?
Is there a way to make this operation more efficient?
读取方法如下:
ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
ParquetMetadata readFooter = reader.getFooter();
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
reader.close();
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
long rows = pages.getRowCount();
System.out.println("Number of rows: " + pages.getRowCount());
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
Group g = (Group) recordReader.read();
//printGroup(g);
groups.add(g);
}
}
} finally {
System.out.println("close the reader");
r.close();
}
写入方法如下:
for(Path file : files){
groups.addAll(readData(file));
}
System.out.println("Number of groups from the parquet files "+groups.size());
Configuration configuration = new Configuration();
Map<String, String> meta = new HashMap<String, String>();
meta.put("startkey", "1");
meta.put("endkey", "2");
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path(outputFile),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
2147483647,
268435456,
134217728,
true,
false,
ParquetProperties.WriterVersion.PARQUET_2_0,
configuration);
System.out.println("Number of groups to write:"+groups.size());
for(Group g : groups) {
writer.write(g);
}
writer.close();
推荐答案
我使用这些功能来合并镶木地板文件,但这是在Scala中.无论如何,这可能会为您提供一个良好的起点.
I use these functions to merge parquet files, but it is in Scala. Anyway, it may give you good starting point.
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
import org.apache.parquet.schema.MessageType
import scala.collection.JavaConverters._
object ParquetFileMerger {
def mergeFiles(inputFiles: Seq[Path], outputFile: Path): Unit = {
val conf = new Configuration()
val mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles.asJava, conf).getFileMetaData
val writer = new ParquetFileWriter(conf, mergedMeta.getSchema, outputFile, ParquetFileWriter.Mode.OVERWRITE)
writer.start()
inputFiles.foreach(input => writer.appendFile(HadoopInputFile.fromPath(input, conf)))
writer.end(mergedMeta.getKeyValueMetaData)
}
def mergeBlocks(inputFiles: Seq[Path], outputFile: Path): Unit = {
val conf = new Configuration()
val parquetFileReaders = inputFiles.map(getParquetFileReader)
val mergedSchema: MessageType =
parquetFileReaders.
map(_.getFooter.getFileMetaData.getSchema).
reduce((a, b) => a.union(b))
val writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outputFile, conf), mergedSchema, ParquetFileWriter.Mode.OVERWRITE, 64*1024*1024, 8388608)
writer.start()
parquetFileReaders.foreach(_.appendTo(writer))
writer.end(new util.HashMap[String, String]())
}
def getParquetFileReader(file: Path): ParquetFileReader = {
ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))
}
}
这篇关于如何有效地读写Parquet文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!