Spark花费太多时间并为某些任务创建了数千个工作 [英] Spark is taking too much time and creating thousands of jobs for some tasks
问题描述
机器配置:
- RAM:16 gb
- 处理器:4核(Xeon E3 3.3 GHz)
问题:
- 耗时:花费超过18分钟
案例情况:
火花模式:本地
Spark Mode: Local
数据库:使用Cassandra 2.1.12
Database: Using Cassandra 2.1.12
我正在将3个表放入数据帧中,该表的行数少于10。是的,少于10(十)。
将其提取到数据帧中后,我多次执行了join,count,show和collection操作。当我执行程序时,Spark正在创建40404个作业4次。它表示需要执行这些工作。我在程序中使用计数4-5次。等待超过18分钟(大约18.5到20)后,它给了我预期的输出。
I am fetching 3 tables into dataframes , which is having less than 10 rows. yes, less than 10 (ten). After fetching it into dataframes I performing joins,count,show and collect operation many times. When I execute my program Spark is creating 40404 jobs 4 times. it indicates that count requires to perform those jobs. I am using count 4-5 times in program. After waiting for more than 18 minutes(approx 18.5 to 20) it gives me expected output.
- 为什么Spark创造了那么多工作?
- 花费这么多时间(18分钟)来执行此数量的作业(大约40404 * 4左右)是否很明显(确定)?
谢谢。
示例代码1:
def getGroups(id: Array[String], level: Int): DataFrame = {
var lvl = level
if (level >= 0) {
for (iterated_id <- id) {
val single_level_group = supportive_df.filter("id = '" + iterated_id + "' and level = " + level).select("family_id")
//single_level_group.show()
intermediate_df = intermediate_df.unionAll(single_level_group)
//println("for loop portion...")
}
final_df = final_df.unionAll(intermediate_df)
lvl -= 1
val user_id_param = intermediate_df.collect().map { row => row.getString(0) }
intermediate_df = empty_df
//println("new method...if portion...")
getGroups(user_id_param, lvl)
} else {
//println("new method...")
final_df.distinct()
}
}
示例代码2:
setGetGroupsVars("u_id", user_id.toString(), sa_user_df)
var user_belong_groups: DataFrame = empty_df
val user_array = Array[String](user_id.toString())
val user_levels = sa_user_df.filter("id = '" + user_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(user_levels.length+"...rapak")
println(user_id.toString())
for (u_lvl <- user_levels) {
val x1 = getGroups(user_array, u_lvl)
x1.show()
empty_df.show()
user_belong_groups.show()
user_belong_groups = user_belong_groups.unionAll(x1)
x1.show()
}
setGetGroupsVars("obj_id", obj_id.toString(), obj_type_specific_df)
var obj_belong_groups: DataFrame = empty_df
val obj_array = Array[String](obj_id.toString())
val obj_levels = obj_type_specific_df.filter("id = '" + obj_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(obj_levels.length)
for (ob_lvl <- obj_levels) {
obj_belong_groups = obj_belong_groups.unionAll(getGroups(obj_array, ob_lvl))
}
user_belong_groups = user_belong_groups.distinct()
obj_belong_groups = obj_belong_groups.distinct()
var user_obj_joined_df = user_belong_groups.join(obj_belong_groups)
user_obj_joined_df.show()
println("vbgdivsivbfb")
var user_obj_access_df = user_obj_joined_df
.join(sa_other_access_df, user_obj_joined_df("u_id") === sa_other_access_df("user_id")
&& user_obj_joined_df("obj_id") === sa_other_access_df("object_id"))
user_obj_access_df.show()
println("KDDD..")
val user_obj_access_cond1 = user_obj_access_df.filter("u_id = '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond1.count() == 0) {
val user_obj_access_cond2 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id = '" + obj_id + "'")
if (user_obj_access_cond2.count() == 0) {
val user_obj_access_cond3 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond3.count() == 0) {
default_df
} else {
val result_ugrp_to_objgrp = user_obj_access_cond3.select("permission").agg(max("permission"))
println("cond4")
result_ugrp_to_objgrp
}
} else {
val result_ugrp_to_ob = user_obj_access_cond2.select("permission")
println("cond3")
result_ugrp_to_ob
}
} else {
val result_u_to_obgrp = user_obj_access_cond1.select("permission")
println("cond2")
result_u_to_obgrp
}
} else {
println("cond1")
individual_access
}
这两个是我程序中的主要代码块,执行时间过长。
These two are major code block in my prog where the execution is taking too longer. It generally takes much time at show or count operation.
推荐答案
- 首先,您可以在GUI中检查以下内容:
- 第二个是您多次使用
distinct()
,因此在使用distinct()
,您必须查看有多少个分区在不重复之后出现。我以为这就是为什么创造数千个工作的原因。 - 如果那是您可以在<$之后使用
coalesce()
的原因。 c $ c> distinct()。 - First you can check in GUI that which stage of your program is taking long time.
- Second is you are using
distinct()
many times, So while usedistinct()
you have to look how many number of partitions are comes after distinct. I thought that's the reason why spark creating thousand of jobs. - If that is the reason you can use
coalesce()
afterdistinct()
.
这篇关于Spark花费太多时间并为某些任务创建了数千个工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!