如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗? [英] How to efficiently read multiple small parquet files with Spark? is there a CombineParquetInputFormat?

查看:36
本文介绍了如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark 生成了多个小型镶木地板文件.如何有效地处理生产者和消费者 Spark 作业上的少量镶木地板文件.

Spark generated multiple small parquet Files. How can one handle efficiently small number of parquet files both on producer and consumer Spark jobs.

推荐答案

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;

import java.io.IOException;

public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {


    @Override
    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
            context) throws IOException {
        CombineFileSplit combineSplit = (CombineFileSplit) split;
        return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
    }

    private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {


        public  CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
                IOException, InterruptedException {
            super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
        }
    }
}

在消费者方面,请使用 CombinedParquetInputFile: 这将强制从单个任务中读取多个小文件.

On consumer side please use the CombinedParquetInputFile: which will force multiple small files to be read from a single task .

在生产者方面:用户合并(numFiles)有足够的文件作为输出.

On Producer side : User coalesce(numFiles) to have adequate no of files as output.

如何在 spark 中使用 customInputFileFormat 并形成 RDD 和 Dataframes :

How to use the customInputFileFormat in spark and form RDD and Dataframes :

     JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
                                            .values()
                                            .map(p -> {
                                                Row row = RowFactory.create(avroPojoToObjectArray((p));
                                                return row;
                                            });


   sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);


//set max split size else only 1 task wil be spawned    
 sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));


     StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
            final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);

请参考http://bytepadding.com/big-data/spark/combineparquetfileinputformat/ 深入理解

这篇关于如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆