单个应用程序可以有多少个 SparkSession? [英] How many SparkSessions can a single application have?

查看:36
本文介绍了单个应用程序可以有多少个 SparkSession?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发现,随着 Spark 运行和表的大小(通过联接)增长,spark 执行器最终会耗尽内存,整个系统崩溃.即使我尝试将临时结果写入 Hive 表(在 HDFS 上),系统仍然没有释放太多内存,并且我的整个系统在大约 130 个连接后崩溃.

然而,通过实验,我意识到如果我将问题分解成更小的部分,将临时结果写入 hive 表,并停止/启动 Spark 会话(和 spark 上下文),那么系统的资源就会被释放.使用这种方法,我能够加入 1,000 多个列.

但是我找不到任何文档来了解这是否被认为是一种好的做法(我知道您不应该一次获得多个会话).大多数系统在开始时获取会话并在最后关闭它.我也可以将应用程序分解成更小的应用程序,并使用像 Oozie 这样的驱动程序在 Yarn 上调度这些更小的应用程序.但是这种方法会在每个阶段启动和停止JVM,这似乎有点重量级.

所以我的问题是:在单个 Spark 应用程序运行期间不断启动/停止 Spark 会话以释放系统资源是不好的做法吗?

<小时>

但是您能否详细说明单个 JVM 上的单个 SparkContext 是什么意思?我可以调用 sparkSession.sparkContext().stop(),也可以调用 stop SparkSession.然后我创建了一个新的 SparkSession 并使用了一个新的 sparkContext.没有抛出错误.

我也可以在 JavaSparkPi 上使用它,没有任何问题.

我已经在 yarn-clientlocal spark install 中测试了这个.

停止 spark 上下文究竟有什么作用,为什么停止后不能创建新的上下文?

解决方案

TL;DR 您可以根据需要拥有任意数量的 SparkSession.

您可以在单个 JVM 上只有一个 SparkContext,但是 SparkSession 的数量几乎是无限的.

<块引用>

但是,您能否详细说明单个 JVM 上的单个 SparkContext 是什么意思?

这意味着在 Spark 应用程序生命周期中的任何给定时间,驱动程序只能是一个且只有一个,这反过来意味着该 JVM 上只有一个 SparkContext 可用.

Spark 应用程序的驱动程序是 SparkContext 所在的位置(或者相反,SparkContext 定义驱动程序的地方 - 区别非常模糊).

一次只能有一个 SparkContext.尽管您可以根据需要多次启动和停止它,但我记得有一个关于它的问题,它说除非您完成了 Spark(这通常发生在最Spark 应用程序结束).

换句话说,在您的 Spark 应用程序的整个生命周期中都有一个 SparkContext.

有一个类似的问题 SparkSession.sql 与 Dataset.sqlContext.sql 之间有什么区别? 关于多个 SparkSession 可以更清楚地说明您为什么想要进行两个或多个会话.

<块引用>

我可以调用 sparkSession.sparkContext().stop(),也可以调用 stop SparkSession.

所以?!这和我说的有什么矛盾?!您停止了 JVM 上唯一可用的 SparkContext.没什么大不了的.你可以,但这只是在一个可用的 JVM 上只能有一个且只有一个 SparkContext"的一部分,不是吗?

SparkSession 仅仅是对 SparkContext 的封装,用于在 Spark Core 的 RDD 之上提供 Spark SQL 的结构化/SQL 功能.

从 Spark SQL 开发人员的角度来看,SparkSession 的目的是成为查询实体(如查询使用的表、视图或函数(如数据帧、数据集或 SQL)和Spark 属性(每个 SparkSession 可能有不同的值).

如果您希望将相同的(临时)表名用于不同的数据集,创建两个 SparkSession 将是我认为推荐的方式.

我刚刚编写了一个示例来展示整个阶段代码生成在 Spark SQL 中的工作原理,并创建了以下代码来简单地关闭该功能.

//where 和 select 操作符都支持全阶段代码生成//计划树(带有运算符和表达式)满足要求//这就是计划插入 WholeStageCodegenExec 的原因//你可以在explain的输出中看到星星(*)val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)标度>q.解释== 物理计划 ==*项目[_2#89 AS c0#93]+- *过滤器 (_1#88 = 0)+- LocalTableScan [_1#88, _2#89, _3#90]//让我们打破spark.sql.codegen.maxFields的要求//我正在创建一个全新的 SparkSession,其中一个属性发生了变化val newSpark = spark.newSession()导入 org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDSnewSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)标度>println(newSpark.sessionState.conf.wholeStageMaxNumFields)2//看看初始值是多少//请注意,我使用的是 spark 值(不是 newSpark)标度>println(spark.sessionState.conf.wholeStageMaxNumFields)100导入 newSpark.implicits._//与上面相同的查询,但在 SparkSession 中创建,WHOLESTAGE_MAX_NUM_FIELDS 为 2val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)//注意explain的输出中没有星星//计划中没有 WholeStageCodegenExec 运算符 =>禁用全阶段代码生成标度>q.解释== 物理计划 ==项目 [_2#122 AS c0#126]+- 过滤器 (_1#121 = 0)+- LocalTableScan [_1#121, _2#122, _3#123]

<块引用>

然后我创建了一个新的 SparkSession 并使用了一个新的 SparkContext.没有抛出错误.

同样,这与我所说的关于单个 SparkContext 可用的说法有何矛盾?我很好奇.

<块引用>

停止 spark 上下文究竟有什么作用,为什么停止后不能创建新的上下文?

您不能再使用它来运行 Spark 作业(以处理大型分布式数据集),这几乎正是您最初使用 Spark 的原因,不是吗?

尝试以下操作:

  1. 停止SparkContext
  2. 使用 Spark Core 的 RDD 或 Spark SQL 的数据集 API 执行任何处理

例外?对!请记住,您关闭了 Spark 的门",所以您怎么可能会在里面?!:)

I have found that as Spark runs, and tables grow in size (through Joins) that the spark executors will eventually run out of memory and the entire system crashes. Even if I try to write temporary results to Hive tables (on HDFS), the system still doesn't free much memory, and my entire system crashes after about 130 joins.

However, through experimentation, I realized that if I break the problem into smaller pieces, write temporary results to hive tables, and Stop/Start the Spark session (and spark context), then the system's resources are freed. I was able to join over 1,000 columns using this approach.

But I can't find any documentation to understand if this is considered a good practice or not (I know you should not acquire multiple sessions at once). Most systems acquire the session in the beginning and close it in the end. I could also break the application into smaller ones, and use a driver like Oozie to schedule these smaller applications on Yarn. But this approach would start and stop the JVM at each stage, which seems a bit heavy-weight.

So my question: is it bad practice to continually start/stop the spark session to free system resources during the run of a single spark application?


But can you elaborate on what you mean by a single SparkContext on a single JVM? I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession. I then created a new SparkSession and used a new sparkContext. No error was thrown.

I was also able to use this on the JavaSparkPi without any problems.

I have tested this in yarn-client and a local spark install.

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?

解决方案

TL;DR You can have as many SparkSessions as needed.

You can have one and only one SparkContext on a single JVM, but the number of SparkSessions is pretty much unbounded.

But can you elaborate on what you mean by a single SparkContext on a single JVM?

It means that at any given time in the lifecycle of a Spark application the driver can only be one and only one which in turn means that there's one and only one SparkContext on that JVM available.

The driver of a Spark application is where the SparkContext lives (or it's the opposite rather where SparkContext defines the driver -- the distinction is pretty much blurry).

You can only have one SparkContext at one time. Although you can start and stop it on demand as many times you want, but I remember an issue about it that said you should not close SparkContext unless you're done with Spark (which usually happens at the very end of your Spark application).

In other words, have a single SparkContext for the entire lifetime of your Spark application.

There was a similar question What's the difference between SparkSession.sql vs Dataset.sqlContext.sql? about multiple SparkSessions that can shed more light on why you'd want to have two or more sessions.

I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession.

So?! How does this contradict what I said?! You stopped the only SparkContext available on the JVM. Not a big deal. You could, but that's just one part of "you can only have one and only one SparkContext on a single JVM available", isn't it?

SparkSession is a mere wrapper around SparkContext to offer Spark SQL's structured/SQL features on top of Spark Core's RDDs.

From the point of Spark SQL developer, the purpose of a SparkSession is to be a namespace for query entities like tables, views or functions that your queries use (as DataFrames, Datasets or SQL) and Spark properties (that could have different values per SparkSession).

If you'd like to have the same (temporary) table name used for different Datasets, creating two SparkSessions would be what I'd consider the recommended way.

I've just worked on an example to showcase how whole-stage codegen works in Spark SQL and have created the following that simply turns the feature off.

// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
   +- LocalTableScan [_1#88, _2#89, _3#90]

// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
   +- LocalTableScan [_1#121, _2#122, _3#123]

I then created a new SparkSession and used a new SparkContext. No error was thrown.

Again, how does this contradict what I said about a single SparkContext being available? I'm curious.

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?

You can no longer use it to run Spark jobs (to process large and distributed datasets) which is pretty much exactly the reason why you use Spark in the first place, doesn't it?

Try the following:

  1. Stop SparkContext
  2. Execute any processing using Spark Core's RDD or Spark SQL's Dataset APIs

An exception? Right! Remember that you close the "doors" to Spark so how could you have expected to be inside?! :)

这篇关于单个应用程序可以有多少个 SparkSession?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆