Spark SQL如何读取压缩的csv文件? [英] How does Spark SQL read compressed csv files?
问题描述
我尝试使用api spark.read.csv
读取扩展名为 bz
或 gzip
的压缩csv文件.有效.但是在源代码中,我找不到任何可以声明 codec
类型的选项参数.
I have tried with api spark.read.csv
to read compressed csv file with extension bz
or gzip
. It worked. But in source code I don't find any option parameter that we can declare the codec
type.
即使在此链接中,也只有设置编解码器
.任何人都可以告诉我,或者提供源代码的路径,以显示spark 2.x版本如何处理压缩的csv文件.
Even in this link, there is only setting for codec
in writing side. Could anyone tell me or give the path to source code that showing how spark 2.x version deal with the compressed csv file.
推荐答案
All text-related data sources, including CSVDataSource, use Hadoop File API to deal with files (it was in Spark Core's RDDs too).
您可以在 HadoopFileLinesReader 其中包含以下几行:
You can find the relevant lines in readFile that leads to HadoopFileLinesReader which has the following lines:
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
file.start,
file.length,
// TODO: Implement Locality
Array.empty)
使用Hadoop的 org.apache.hadoop.fs.Path 处理基础文件的压缩.
That uses Hadoop's org.apache.hadoop.fs.Path that deals with compression of the underlying file(s).
快速搜索之后,我能够找到处理压缩的Hadoop属性,即 mapreduce.output.fileoutputformat.compress
.
After quick googling, I was able to find the Hadoop property that deals with compression which is mapreduce.output.fileoutputformat.compress
.
That led me to Spark SQL's CompressionCodecs with the following compression configuration:
"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)
在代码下面,您可以找到 setCodecConfiguration .
Below in the code, you can find setCodecConfiguration that uses "our" option.
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
if (codec != null) {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
The other method getCodecClassName is used to resolve compression
option for JSON, CSV, and text formats.
这篇关于Spark SQL如何读取压缩的csv文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!