Spark SQL 如何读取压缩后的 csv 文件? [英] How does Spark SQL read compressed csv files?

查看:42
本文介绍了Spark SQL 如何读取压缩后的 csv 文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已尝试使用 api spark.read.csv 读取扩展名为 bzgzip 的压缩 csv 文件.有效.但是在源代码中我没有找到任何我们可以声明 codec 类型的选项参数.

I have tried with api spark.read.csv to read compressed csv file with extension bz or gzip. It worked. But in source code I don't find any option parameter that we can declare the codec type.

即使在这个 link 中,也只有 的设置编解码器在编写端.谁能告诉我或提供显示 spark 2.x 版本如何处理压缩 csv 文件的源代码的路径.

Even in this link, there is only setting for codec in writing side. Could anyone tell me or give the path to source code that showing how spark 2.x version deal with the compressed csv file.

推荐答案

所有与文本相关的数据源,包括 CSVDataSource,使用Hadoop File API处理文件(在SparkCore 的 RDDs 也是).

All text-related data sources, including CSVDataSource, use Hadoop File API to deal with files (it was in Spark Core's RDDs too).

您可以在 readFile 导致 HadoopFileLinesReader 有以下几行:

You can find the relevant lines in readFile that leads to HadoopFileLinesReader which has the following lines:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)

使用 Hadoop 的 org.apache.hadoop.fs.Path 处理底层文件的压缩.

That uses Hadoop's org.apache.hadoop.fs.Path that deals with compression of the underlying file(s).

快速谷歌搜索后,我能够找到处理压缩的 Hadoop 属性,它是 mapreduce.output.fileoutputformat.compress.

After quick googling, I was able to find the Hadoop property that deals with compression which is mapreduce.output.fileoutputformat.compress.

这让我想到了 Spark SQL 的 CompressionCodecs 具有以下压缩配置:

That led me to Spark SQL's CompressionCodecs with the following compression configuration:

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

在代码下面,你可以找到setCodecConfiguration 使用我们的"选项.

Below in the code, you can find setCodecConfiguration that uses "our" option.

  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

另一种方法 getCodecClassName 用于解析 JSON, CSV文本 格式.

The other method getCodecClassName is used to resolve compression option for JSON, CSV, and text formats.

这篇关于Spark SQL 如何读取压缩后的 csv 文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆