Spark花费太多时间并为某些任务创建了数千个工作 [英] Spark is taking too much time and creating thousands of jobs for some tasks

查看:117
本文介绍了Spark花费太多时间并为某些任务创建了数千个工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

机器配置:


  • RAM:16 gb

  • 处理器:4核(Xeon E3 3.3 GHz)

问题:


  • 耗时:花费超过18分钟

案例情况:



火花模式:本地

Spark Mode: Local


  • 数据库:使用Cassandra 2.1.12

  • Database: Using Cassandra 2.1.12

    我正在将3个表放入数据帧中,该表的行数少于10。是的,少于10(十)。
    将其提取到数据帧中后,我多次执行了join,count,show和collection操作。当我执行程序时,Spark正在创建40404个作业4次。它表示需要执行这些工作。我在程序中使用计数4-5次。等待超过18分钟(大约18.5到20)后,它给了我预期的输出。

    I am fetching 3 tables into dataframes , which is having less than 10 rows. yes, less than 10 (ten). After fetching it into dataframes I performing joins,count,show and collect operation many times. When I execute my program Spark is creating 40404 jobs 4 times. it indicates that count requires to perform those jobs. I am using count 4-5 times in program. After waiting for more than 18 minutes(approx 18.5 to 20) it gives me expected output.


    • 为什么Spark创造了那么多工作?

    • 花费这么多时间(18分钟)来执行此数量的作业(大约40404 * 4左右)是否很明显(确定)?

    谢谢。

    示例代码1:

    def getGroups(id: Array[String], level: Int): DataFrame = {
        var lvl = level
        if (level >= 0) {
          for (iterated_id <- id) {
            val single_level_group = supportive_df.filter("id = '" + iterated_id + "' and level = " + level).select("family_id")
            //single_level_group.show()
            intermediate_df = intermediate_df.unionAll(single_level_group)
            //println("for loop portion...")
          }
          final_df = final_df.unionAll(intermediate_df)
          lvl -= 1
          val user_id_param = intermediate_df.collect().map { row => row.getString(0) }
          intermediate_df = empty_df
          //println("new method...if portion...")
          getGroups(user_id_param, lvl)
        } else {
          //println("new method...")
          final_df.distinct()
        }
      }
    

    示例代码2:

     setGetGroupsVars("u_id", user_id.toString(), sa_user_df)
      var user_belong_groups: DataFrame = empty_df
      val user_array = Array[String](user_id.toString())
    
      val user_levels = sa_user_df.filter("id = '" + user_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
    
      println(user_levels.length+"...rapak")
      println(user_id.toString())
      for (u_lvl <- user_levels) {
        val x1 = getGroups(user_array, u_lvl)
        x1.show()
        empty_df.show()
        user_belong_groups.show()
        user_belong_groups = user_belong_groups.unionAll(x1)
        x1.show()
      }
      setGetGroupsVars("obj_id", obj_id.toString(), obj_type_specific_df)
      var obj_belong_groups: DataFrame = empty_df
      val obj_array = Array[String](obj_id.toString())
      val obj_levels = obj_type_specific_df.filter("id = '" + obj_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
      println(obj_levels.length)
      for (ob_lvl <- obj_levels) {
        obj_belong_groups = obj_belong_groups.unionAll(getGroups(obj_array, ob_lvl))
      }
      user_belong_groups = user_belong_groups.distinct()
      obj_belong_groups = obj_belong_groups.distinct()
      var user_obj_joined_df = user_belong_groups.join(obj_belong_groups)
      user_obj_joined_df.show()
    
      println("vbgdivsivbfb")
      var user_obj_access_df = user_obj_joined_df
        .join(sa_other_access_df, user_obj_joined_df("u_id") === sa_other_access_df("user_id")
          && user_obj_joined_df("obj_id") === sa_other_access_df("object_id"))
      user_obj_access_df.show()
      println("KDDD..")
    
      val user_obj_access_cond1 = user_obj_access_df.filter("u_id = '" + user_id + "' and obj_id != '" + obj_id + "'")
      if (user_obj_access_cond1.count() == 0) {
        val user_obj_access_cond2 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id = '" + obj_id + "'")
        if (user_obj_access_cond2.count() == 0) {
          val user_obj_access_cond3 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id != '" + obj_id + "'")
          if (user_obj_access_cond3.count() == 0) {
            default_df
          } else {
            val result_ugrp_to_objgrp = user_obj_access_cond3.select("permission").agg(max("permission"))
            println("cond4")
            result_ugrp_to_objgrp
          }
        } else {
          val result_ugrp_to_ob = user_obj_access_cond2.select("permission")
          println("cond3")
          result_ugrp_to_ob
        }
      } else {
        val result_u_to_obgrp = user_obj_access_cond1.select("permission")
        println("cond2")
        result_u_to_obgrp
      }
    } else {
      println("cond1")
      individual_access
    }
    

    这两个是我程序中的主要代码块,执行时间过长。

    These two are major code block in my prog where the execution is taking too longer. It generally takes much time at show or count operation.

    推荐答案


    • 首先,您可以在GUI中检查以下内容:

    • 第二个是您多次使用 distinct(),因此在使用 distinct(),您必须查看有多少个分区在不重复之后出现。我以为这就是为什么创造数千个工作的原因。

    • 如果那是您可以在<$之后使用 coalesce()的原因。 c $ c> distinct()。

      • First you can check in GUI that which stage of your program is taking long time.
      • Second is you are using distinct() many times, So while use distinct() you have to look how many number of partitions are comes after distinct. I thought that's the reason why spark creating thousand of jobs.
      • If that is the reason you can use coalesce() after distinct().
      • 这篇关于Spark花费太多时间并为某些任务创建了数千个工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆