如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗? [英] How to efficiently read multiple small parquet files with Spark? is there a CombineParquetInputFormat?
问题描述
Spark 生成了多个小型镶木地板文件.如何有效地处理生产者和消费者 Spark 作业上的少量镶木地板文件.
Spark generated multiple small parquet Files. How can one handle efficiently small number of parquet files both on producer and consumer Spark jobs.
推荐答案
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
import java.io.IOException;
public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
}
private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {
public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
IOException, InterruptedException {
super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
}
}
}
在消费者方面,请使用 CombinedParquetInputFile: 这将强制从单个任务中读取多个小文件.
On consumer side please use the CombinedParquetInputFile: which will force multiple small files to be read from a single task .
在生产者方面:用户合并(numFiles)有足够的文件作为输出.
On Producer side : User coalesce(numFiles) to have adequate no of files as output.
如何在 spark 中使用 customInputFileFormat 并形成 RDD 和 Dataframes :
How to use the customInputFileFormat in spark and form RDD and Dataframes :
JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
.values()
.map(p -> {
Row row = RowFactory.create(avroPojoToObjectArray((p));
return row;
});
sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);
//set max split size else only 1 task wil be spawned
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));
StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);
请参考http://bytepadding.com/big-data/spark/combineparquetfileinputformat/ 深入理解
这篇关于如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!