如何有效并行化不同的SparkSQL执行? [英] How can I parallelize different SparkSQL execution efficiently?

查看:581
本文介绍了如何有效并行化不同的SparkSQL执行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

环境

  • 斯卡拉
  • Apache Spark:Spark 2.2.1
  • AWS上的EMC:emr-5.12.1

内容

我有一个大的DataFrame,如下所示:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

s3://some_bucket处有〜1TB的JSON文件,其中包括group_id的5000个分区. 我想使用SparkSQL执行转换,并且每个group_id都不同.

Spark代码如下:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")

// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

par可以并行化Runtime.getRuntime.availableProcessors num,它等于驱动程序内核的数量.

但是似乎很奇怪并且效率不高,因为它与Spark的并行化无关.

我真的想使用scala.collection.Seq中的groupBy之类的东西.

这不是正确的火花代码:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})

解决方案

1)首先,如果您的数据已经按组ID存储在文件中,则没有理由将其混合,然后使用Spark按ID进行分组. 为每个组ID加载仅相关文件更加简单和有效

2)Spark本身使计算并行化.因此,在大多数情况下,不需要外部并行化. 但是,如果您觉得Spark并未利用所有资源,则可以:

a)如果每个单独的计算花费的时间少于几秒钟,则任务调度的开销与任务执行时间相当,因此可以通过并行运行几个任务来获得提升.

b)计算需要花费大量时间,但是资源仍未得到充分利用.然后很可能应该增加数据集的分区数量.

3)如果您最终决定并行运行多个任务,则可以通过以下方式实现:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

Environment

  • Scala
  • Apache Spark: Spark 2.2.1
  • EMR on AWS: emr-5.12.1

Content

I have one large DataFrame, like below:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

There are JSON files ~1TB at s3://some_bucket and it includes 5000 partitions of group_id. I want to execute conversion using SparkSQL, and it differs by each group_id.

The Spark code is like below:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")

// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

The par can parallelize by Runtime.getRuntime.availableProcessors num, and it will be equal to the number of driver's cores.

But It seems weird and not efficient enough because it has nothing to do with Spark's parallization.

I really want to do with something like groupBy in scala.collection.Seq.

This is not right spark code:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})

解决方案

1) First of all if your data is already stored in files per group id there is no reason to mix it up and then group by id using Spark. It's much more simple and efficient to load for each group id only relevant files

2) Spark itself parallelizes the computation. So in most cases there is no need for external parallelization. But if you feel that Spark doesn't utilize all resources you can:

a) if each individual computation takes less than few seconds then task schedulling overhead is comparable to task execution time so it's possible to get a boost by running few tasks in parallel.

b) computation takes significant amount of time but resources are still underutilized. Then most probably you should increase the number of partitions for your dataset.

3) If you finally decided to run several tasks in parallel it can be achieved this way:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

这篇关于如何有效并行化不同的SparkSQL执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆