优化在Spark SQL中将分区数据写入S3 [英] Optimizing partitioned data writes to S3 in spark sql

查看:189
本文介绍了优化在Spark SQL中将分区数据写入S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在每次Spark作业运行中,我从HDFS读取大约700 GB的数据.我的工作读取此数据,过滤掉大约60%的数据,并按以下方式对其进行分区:

I have around 700 GB of data which I am reading from HDFS in each Spark job run. My job reads this data, filters around 60% of the data, partitions it like:

val toBePublishedSignals = hiveCtx.sql("some query")

toBePublishedSignals.write.partitionBy("A", "B", "C").format(JSON_DATA_FORMAT)
      .mode(SaveMode.Append).save(getS3DataPath())

val metadataFiles = hiveCtx.sql("some query")
metadataFiles.distinct().write.partitionBy("A", "C").format(JSON_DATA_FORMAT)
  .mode(SaveMode.Append).save(getS3MetadataPath())

作业卡在驱动程序上.我将驱动程序转储了下来,并将其卡在以下位置:

The job gets stuck on the driver. I took a dump of the driver and it is stuck at the following:

    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.retrieveObjectListing(AWSS3FileSystem.java:366)
    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.getFileStatus(AWSS3FileSystem.java:335)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:402)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    - locked <0x00000002d9b98288> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    - locked <0x00000002d9b98330> (a org.apache.spark.sql.execution.QueryExecution)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)

看起来像S3上市是一个很大的瓶颈.作业卡了几个小时,无法完成.

Looks like S3 listing is a big bottleneck. The job gets stuck for hours and does not complete.

或者无论如何我都可以像这样存储路径 在数据帧中S3://bucket/A = dvfw/B = wfwef,按路径重新划分数据帧,然后仅按"C"分区并写入路径?我不知道如何在不遍历整个数据帧并一次性保存DF的情况下执行此操作.

Or is there anyway where I can store the path like S3://bucket/A=dvfw/B=wfwef in the dataframe, repartition the dataframe by the path and then partitionBy only 'C' and write to the path? I don't know how I can do this without iterating through the entire dataframe and save the DF at one go.

从早上开始就喜欢它!寻找有关如何处理/避免这种情况的建议!

Been on it since morning! Looking for some advice on how to handle this / avoid this!

TIA!

推荐答案

据我所知,这种情况发生在您以追加模式编写并且最终位置中有很多分区时. Spark检索现有的分区,可能还会检索架构.我会提出两种可能的解决方案.

As far as I remember, this situation happens when you write in append mode and you have a lot of partitions in the final location. Spark retrieves existing partitions and probably schemas. I would suggest two possible solutions.

1)如果每次执行没有很多分区要写入,则可以尝试以下操作:

1) If you don't have a lot of partitions to write per execution you can try the following:

// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val b = p.getAs[String]("B"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
    toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
                       .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

与元数据相同.

// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
    metadataFiles.filter(col("A") === a && col("C") === c)
                 .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

我不知道分区列的数据类型,因此在我的示例中,它们是字符串.上面的代码仅是示例.可以使用折叠操作并从DataFrame架构中检索数据类型,将其重写为更通用的方式.

I don't know about data types of the partition columns, so in my example they are strings. The code above is only an example. It can be rewritten to a more generic way using fold operation and retrieving data types from DataFrame schema.

2)作为一种选择,可以从要触摸的分区中读取现有数据中的记录,并与传入的记录合并.假设A/B/C对应地是year/month/day.我们有一些新数据,而df DataFrame是数据处理的结果.经过处理后,我们得到以下数据

2) As an option, it is possible to read records from partitions you are going to touch in existing data and union with incoming records. Let's imagine that A/B/C are year/month/day correspondingly. We have some new data and df DataFrame is a result of processing of the data. After processing we the following data

2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3

这意味着我们需要从包含最终数据的位置(getS3DataPath()返回的位置)读取分区

It means that we need to read partitions from the location that contains final data (location which is returned by getS3DataPath())

year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15

为此,我们需要创建一个过滤器功能,该功能是其他几个功能的组合.我们使用reduce通过以下逻辑将它们组合在一起:

To do that we need to create a filter function which is a combination of several other functions. We use reduce for combining them using the following logic:

year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15

// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")

// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
    partitionColumns.map { columnName =>
        (input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
    }.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)

// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)

// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
    .union(toBePublishedSignalsNew)
    .write
    .partitionBy("A", "B", "C")
    .format(JSON_DATA_FORMAT)
    .mode(SaveMode.Overwrite)
    .save(temporaryLocationS3)

此后,您需要将getS3DataPath()返回的位置中的分区替换为temporaryLocationS3中的分区.仅当分区列包含字符串时,以上示例才有效.如果它们具有其他数据类型,则可能必须为过滤器功能添加一些映射.例如,对于IntegerType,它看起来像

After that you will need to replace partitions in the location which is returned by getS3DataPath() with ones located in the temporaryLocationS3. The example above will work only if the partition columns contain strings. If they have other data types you will probably have to add some mapping for filter functions. For example for IntegerType it will look like

(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)

这篇关于优化在Spark SQL中将分区数据写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆