如何有效地读写Parquet文件? [英] How to read and write Parquet files efficiently?

查看:123
本文介绍了如何有效地读写Parquet文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一种实用程序,该实用程序可以一次读取多个镶木地板文件,并将它们写入一个输出文件中.实现非常简单.该实用程序从目录中读取实木复合地板文件,从所有文件中读取 Group 并将它们放入列表中,然后使用ParquetWrite将所有这些组写入单个文件中.
读取600mb后,它会抛出Java堆空间的内存不足错误.读取和写入500mb的数据也需要15-20分钟.

I am working on a utility which reads multiple parquet files at a time and writing them into one single output file. the implementation is very straightforward. This utility reads parquet files from the directory, reads Group from all the file and put them into a list .Then uses ParquetWrite to write all these Groups into a single file.
After reading 600mb it throws Out of memory error for Java heap space. It also takes 15-20 minutes to read and write 500mb of data.

有没有办法使此操作更有效?

Is there a way to make this operation more efficient?

读取方法如下:

ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
          ParquetMetadata readFooter = reader.getFooter();
          MessageType schema = readFooter.getFileMetaData().getSchema();
          ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
          reader.close();
          PageReadStore pages = null;
          try {
            while (null != (pages = r.readNextRowGroup())) {
              long rows = pages.getRowCount();
              System.out.println("Number of rows: " + pages.getRowCount());

              MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
              RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
              for (int i = 0; i < rows; i++) {
                Group g = (Group) recordReader.read();
                //printGroup(g);
                groups.add(g);
              }
            }
          } finally {
            System.out.println("close the reader");

            r.close();
          }

写入方法如下:

for(Path file : files){
            groups.addAll(readData(file));
        }

        System.out.println("Number of groups from the parquet files "+groups.size());

        Configuration configuration = new Configuration();
        Map<String, String> meta = new HashMap<String, String>();
        meta.put("startkey", "1");
        meta.put("endkey", "2");
        GroupWriteSupport.setSchema(schema, configuration);
        ParquetWriter<Group> writer = new ParquetWriter<Group>(
                new Path(outputFile),
                new GroupWriteSupport(),
                CompressionCodecName.SNAPPY,
                2147483647,
                268435456,
                134217728,
                true,
                false,
                ParquetProperties.WriterVersion.PARQUET_2_0,
                configuration);
        System.out.println("Number of groups to write:"+groups.size());
        for(Group g : groups) {
            writer.write(g);
        }
        writer.close();

推荐答案

我使用这些功能来合并镶木地板文件,但这是在Scala中.无论如何,这可能会为您提供一个良好的起点.

I use these functions to merge parquet files, but it is in Scala. Anyway, it may give you good starting point.

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
import org.apache.parquet.schema.MessageType

import scala.collection.JavaConverters._

object ParquetFileMerger {
    def mergeFiles(inputFiles: Seq[Path], outputFile: Path): Unit = {
        val conf = new Configuration()
        val mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles.asJava, conf).getFileMetaData
        val writer = new ParquetFileWriter(conf, mergedMeta.getSchema, outputFile, ParquetFileWriter.Mode.OVERWRITE)

        writer.start()
        inputFiles.foreach(input => writer.appendFile(HadoopInputFile.fromPath(input, conf)))
        writer.end(mergedMeta.getKeyValueMetaData)
    }

    def mergeBlocks(inputFiles: Seq[Path], outputFile: Path): Unit = {
        val conf = new Configuration()
        val parquetFileReaders = inputFiles.map(getParquetFileReader)
        val mergedSchema: MessageType =
            parquetFileReaders.
              map(_.getFooter.getFileMetaData.getSchema).
              reduce((a, b) => a.union(b))

        val writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outputFile, conf), mergedSchema, ParquetFileWriter.Mode.OVERWRITE, 64*1024*1024, 8388608)

        writer.start()
        parquetFileReaders.foreach(_.appendTo(writer))
        writer.end(new util.HashMap[String, String]())
    }

    def getParquetFileReader(file: Path): ParquetFileReader = {
        ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))
    }
}

这篇关于如何有效地读写Parquet文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆