如何使用Spark有效读取多个小型拼花地板文件?有没有CombineParquetInputFormat吗? [英] How to efficiently read multiple small parquet files with Spark? is there a CombineParquetInputFormat?

查看:150
本文介绍了如何使用Spark有效读取多个小型拼花地板文件?有没有CombineParquetInputFormat吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark生成了多个小型拼花文件.如何在生产者和消费者Spark作业上有效地处理少量拼花文件.

Spark generated multiple small parquet Files. How can one handle efficiently small number of parquet files both on producer and consumer Spark jobs.

推荐答案

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;

import java.io.IOException;

public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {


    @Override
    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
            context) throws IOException {
        CombineFileSplit combineSplit = (CombineFileSplit) split;
        return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
    }

    private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {


        public  CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
                IOException, InterruptedException {
            super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
        }
    }
}

在消费者方面,请使用CombinedParquetInputFile:,它将强制从单个任务读取多个小文件.

On consumer side please use the CombinedParquetInputFile: which will force multiple small files to be read from a single task .

在生产者方面: 用户合并(numFiles),使其没有足够的文件作为输出.

On Producer side : User coalesce(numFiles) to have adequate no of files as output.

如何在spark中使用customInputFileFormat并使用RDD和Dataframes形式:

How to use the customInputFileFormat in spark and form RDD and Dataframes :

     JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
                                            .values()
                                            .map(p -> {
                                                Row row = RowFactory.create(avroPojoToObjectArray((p));
                                                return row;
                                            });


   sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);


//set max split size else only 1 task wil be spawned    
 sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));


     StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
            final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);

请参考 http://bytepadding.com/big-data/spark/combineparquetfileinputformat/进行深入了解

这篇关于如何使用Spark有效读取多个小型拼花地板文件?有没有CombineParquetInputFormat吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆