如何有效地并行化不同的 SparkSQL 执行? [英] How can I parallelize different SparkSQL execution efficiently?

查看:35
本文介绍了如何有效地并行化不同的 SparkSQL 执行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  • 斯卡拉
  • Apache Spark:Spark 2.2.1
  • AWS 上的 EMR:emr-5.12.1

我有一个大型 DataFrame,如下所示:

I have one large DataFrame, like below:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

s3://some_bucket 有 1TB 左右的 JSON 文件,其中包含 5000 个 group_id 分区.我想使用 SparkSQL 执行转换,并且每个 group_id 都不同.

There are JSON files ~1TB at s3://some_bucket and it includes 5000 partitions of group_id. I want to execute conversion using SparkSQL, and it differs by each group_id.

Spark 代码如下:

The Spark code is like below:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")

// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

par 可以通过Runtime.getRuntime.availableProcessors num 进行并行化,它将等于驱动程序的内核数.

The par can parallelize by Runtime.getRuntime.availableProcessors num, and it will be equal to the number of driver's cores.

但是这看起来很奇怪而且不够高效,因为它与 Spark 的并行化无关.

But It seems weird and not efficient enough because it has nothing to do with Spark's parallization.

我真的很想在 scala.collection.Seq 中使用类似 groupBy 的东西.

I really want to do with something like groupBy in scala.collection.Seq.

这不是正确的火花代码:

This is not right spark code:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})

推荐答案

1) 首先,如果您的数据已经按组 id 存储在文件中,则没有理由将其混合起来,然后使用 Spark 按 id 进行分组.为每个组 ID 只加载相关文件更简单有效

1) First of all if your data is already stored in files per group id there is no reason to mix it up and then group by id using Spark. It's much more simple and efficient to load for each group id only relevant files

2) Spark 本身将计算并行化.所以在大多数情况下不需要外部并行化.但如果您觉得 Spark 没有利用所有资源,您可以:

2) Spark itself parallelizes the computation. So in most cases there is no need for external parallelization. But if you feel that Spark doesn't utilize all resources you can:

a) 如果每个单独的计算花费的时间少于几秒钟,那么任务调度开销与任务执行时间相当,因此可以通过并行运行几个任务来获得提升.

a) if each individual computation takes less than few seconds then task schedulling overhead is comparable to task execution time so it's possible to get a boost by running few tasks in parallel.

b) 计算需要大量时间,但资源仍未得到充分利用.那么很可能您应该增加数据集的分区数.

b) computation takes significant amount of time but resources are still underutilized. Then most probably you should increase the number of partitions for your dataset.

3) 如果您最终决定并行运行多个任务,则可以通过以下方式实现:

3) If you finally decided to run several tasks in parallel it can be achieved this way:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

这篇关于如何有效地并行化不同的 SparkSQL 执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆