如何优化下面的火花代码(scala)? [英] How to optimize below spark code (scala)?
问题描述
我有一些大文件(19GB、40GB 等).我需要对这些文件执行以下算法:
I have some huge files (of 19GB, 40GB etc.). I need to execute following algorithm on these files:
- 读取文件
- 按 1 列排序
取 1st 70% 的数据:
- Read the file
- Sort it on the basis of 1 column
Take 1st 70% of the data:
a) 取列子集的所有不同记录
b) 将其写入训练文件
a) Take all the distinct records of the subset of the columns
b) write it to train file
取最后 30% 的数据:
Take the last 30% of the data:
a) 取列子集的所有不同记录
a) Take all the distinct records of the subset of the columns
b) 将其写入测试文件
我尝试在 spark 中运行以下代码(使用 Scala).
I tried running following code in spark (using Scala).
import scala.collection.mutable.ListBuffer
import java.io.FileWriter
import org.apache.spark.sql.functions.year
val offers = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.option("delimiter", ",")
.load("input.txt")
val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
val maxTimeTrain = trainDF.agg(max("utcDate"))
val maxtimeStamp = maxTimeTrain.collect()(0).getTimestamp(0)
val testDF = csvOuterJoin.filter(csvOuterJoin("utcDate") > maxtimeStamp)
val inputTrain = trainDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
val inputTest = testDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
inputTrain.rdd.coalesce(1,false).saveAsTextFile("train.csv")
inputTest.rdd.coalesce(1,false).saveAsTextFile("test.csv")
这就是我启动 spark-shell 的方式:
This is how I initiate spark-shell:
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 --total-executor-cores 70 --executor-memory 10G --driver-memory 20G
我在一个分布式集群上执行此代码,该集群有 1 个主站和许多从站,每个从站都有足够的 RAM.截至目前,这段代码最终占用了大量内存,并且我遇到了 Java 堆空间问题.
I execute this code on a distributed cluster with 1 master and many slaves each having sufficient amount of RAM. As of now, this code ends up taking a lot of memory and I get java heap space issues.
有没有办法优化上面的代码(最好在spark中)?感谢您对优化上述代码的任何最小帮助.
Is there a way to optimize the above code (preferably in spark)? I appreciate any kind of minimal help in optimizing the above code.
推荐答案
问题是你根本不分发.来源在这里:
The problem is you don't distribute at all. And the source is here:
val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
limit
操作不是为大规模操作而设计的,它将所有记录移动到单个分区:
limit
operation is not designed for large scale operations and it moves all records to a single partition:
val df = spark.range(0, 10000, 1, 1000)
df.rdd.partitions.size
Int = 1000
// Take all records by limit
df.orderBy($"id").limit(10000).rdd.partitions.size
Int = 1
您可以使用RDD
API:
val ordered = df.orderBy($"utcDate")
val cnt = df.count * 0.7
val train = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
case (_, i) => i <= cnt
}.map(_._1), ordered.schema)
val test = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
case (_, i) => i > cnt
}.map(_._1), ordered.schema)
这篇关于如何优化下面的火花代码(scala)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!