如何优化下面的火花代码(scala)? [英] How to optimize below spark code (scala)?

查看:35
本文介绍了如何优化下面的火花代码(scala)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些大文件(19GB、40GB 等).我需要对这些文件执行以下算法:

I have some huge files (of 19GB, 40GB etc.). I need to execute following algorithm on these files:

  1. 读取文件
  2. 按 1 列排序
  3. 取 1st 70% 的数据:

  1. Read the file
  2. Sort it on the basis of 1 column
  3. Take 1st 70% of the data:

a) 取列子集的所有不同记录
b) 将其写入训练文件

a) Take all the distinct records of the subset of the columns
b) write it to train file

取最后 30% 的数据:

Take the last 30% of the data:

a) 取列子集的所有不同记录

a) Take all the distinct records of the subset of the columns

b) 将其写入测试文件

我尝试在 spark 中运行以下代码(使用 Scala).

I tried running following code in spark (using Scala).

import scala.collection.mutable.ListBuffer

import java.io.FileWriter

import org.apache.spark.sql.functions.year

    val offers = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .option("delimiter", ",")
    .load("input.txt")
    val csvOuterJoin = offers.orderBy("utcDate")
    val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
    val maxTimeTrain = trainDF.agg(max("utcDate"))
    val maxtimeStamp = maxTimeTrain.collect()(0).getTimestamp(0)
    val testDF = csvOuterJoin.filter(csvOuterJoin("utcDate") > maxtimeStamp)    
    val inputTrain = trainDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    val inputTest = testDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    inputTrain.rdd.coalesce(1,false).saveAsTextFile("train.csv")
    inputTest.rdd.coalesce(1,false).saveAsTextFile("test.csv")   

这就是我启动 spark-shell 的方式:

This is how I initiate spark-shell:

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 --total-executor-cores 70 --executor-memory 10G --driver-memory 20G

我在一个分布式集群上执行此代码,该集群有 1 个主站和许多从站,每个从站都有足够的 RAM.截至目前,这段代码最终占用了大量内存,并且我遇到了 Java 堆空间问题.

I execute this code on a distributed cluster with 1 master and many slaves each having sufficient amount of RAM. As of now, this code ends up taking a lot of memory and I get java heap space issues.

有没有办法优化上面的代码(最好在spark中)?感谢您对优化上述代码的任何最小帮助.

Is there a way to optimize the above code (preferably in spark)? I appreciate any kind of minimal help in optimizing the above code.

推荐答案

问题是你根本不分发.来源在这里:

The problem is you don't distribute at all. And the source is here:

val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)

limit 操作不是为大规模操作而设计的,它将所有记录移动到单个分区:

limit operation is not designed for large scale operations and it moves all records to a single partition:

val df = spark.range(0, 10000, 1, 1000)
df.rdd.partitions.size

Int = 1000

// Take all records by limit
df.orderBy($"id").limit(10000).rdd.partitions.size

Int = 1

您可以使用RDD API:

val ordered = df.orderBy($"utcDate")
val cnt = df.count * 0.7

val train = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i <= cnt 
}.map(_._1), ordered.schema)

val test = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i > cnt 
}.map(_._1), ordered.schema)

这篇关于如何优化下面的火花代码(scala)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆