Spark SQL如何读取压缩的csv文件? [英] How does Spark SQL read compressed csv files?

查看:122
本文介绍了Spark SQL如何读取压缩的csv文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用api spark.read.csv 读取扩展名为 bz gzip 的压缩csv文件.有效.但是在源代码中,我找不到任何可以声明 codec 类型的选项参数.

I have tried with api spark.read.csv to read compressed csv file with extension bz or gzip. It worked. But in source code I don't find any option parameter that we can declare the codec type.

即使在此链接中,也只有设置编解码器.任何人都可以告诉我,或者提供源代码的路径,以显示spark 2.x版本如何处理压缩的csv文件.

Even in this link, there is only setting for codec in writing side. Could anyone tell me or give the path to source code that showing how spark 2.x version deal with the compressed csv file.

推荐答案

所有与文本相关的数据源,包括

All text-related data sources, including CSVDataSource, use Hadoop File API to deal with files (it was in Spark Core's RDDs too).

您可以在

You can find the relevant lines in readFile that leads to HadoopFileLinesReader which has the following lines:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)

使用Hadoop的 org.apache.hadoop.fs.Path 处理基础文件的压缩.

That uses Hadoop's org.apache.hadoop.fs.Path that deals with compression of the underlying file(s).

快速搜索之后,我能够找到处理压缩的Hadoop属性,即 mapreduce.output.fileoutputformat.compress .

After quick googling, I was able to find the Hadoop property that deals with compression which is mapreduce.output.fileoutputformat.compress.

这使我想到了Spark SQL的

That led me to Spark SQL's CompressionCodecs with the following compression configuration:

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

在代码下面,您可以找到

Below in the code, you can find setCodecConfiguration that uses "our" option.

  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

另一种方法 CSV

The other method getCodecClassName is used to resolve compression option for JSON, CSV, and text formats.

这篇关于Spark SQL如何读取压缩的csv文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆