如何使用Spark优化CSV远程文件上的架构推断 [英] how can I optimize schema inference on a remote file for CSV with Spark

查看:115
本文介绍了如何使用Spark优化CSV远程文件上的架构推断的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在S3(或其他版本)中有一个远程文件,我需要该文件的架构. 我没有找到像JSON(e.g. read.option("samplingRation", 0.25))那样对数据进行采样的选项.

I have a remote file in S3 (or other) and I need the schema of the file. I did not find an option to sample the data as for JSON (e.g. read.option("samplingRation", 0.25)).

有没有一种方法可以优化模式的读取?

Is there a way to optimize the reading of the schema?

Spark在返回推断的架构之前,会通过网络读取整个CSV文件.对于大文件,这可能需要相当长的时间.

Spark reads the entire CSV file over the network before returning the inferred schema. For large files this can take quite a long time.

.option("samplingRatio", samplingRatioVal)在CSV上不起作用

推荐答案

/**
    * infer schema for a remote csv file by reading a sample of the file and infering on that.
    * the spark-infer-schema behavior by default reads the entire dataset once!
    * for large remote files this is not desired. (e.g. inferring schema on a 3GB file across oceans takes a while)
    * speedup is achieved by only reading the first `schemaSampleSize` rows
    *
    * @param fileLocation
    * @param schemaSampleSize rows to be taken into consideration for infering the Schema
    * @param headerOption
    * @param delimiterOption
    * @return
    */
  def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, schemaSampleSize: Int, headerOption: Boolean, delimiterOption: String): StructType = {
    val dataFrameReader: DataFrameReader = sparkSession.read
    val dataSample: Array[String] = dataFrameReader.textFile(fileLocation).head(schemaSampleSize)
    val firstLine = dataSample.head

    import sparkSession.implicits._
    val ds: Dataset[String] = sparkSession.createDataset(dataSample)

    val extraOptions = new scala.collection.mutable.HashMap[String, String]
    extraOptions += ("inferSchema" -> "true")
    extraOptions += ("header" -> headerOption.toString)
    extraOptions += ("delimiter" -> delimiterOption)

    val csvOptions: CSVOptions = new CSVOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone)
    val schema: StructType = TextInputCSVDataSource.inferFromDataset(sparkSession, ds, Some(firstLine), csvOptions)

    schema
  }

例如

schemaSampleSize = 10000

schemaSampleSize = 10000

delimiterOption =','

delimiterOption = ','

这篇关于如何使用Spark优化CSV远程文件上的架构推断的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆