Spark的最终任务比第一个199的时间长100倍,如何提高 [英] Spark final task takes 100x times longer than first 199, how to improve

查看:156
本文介绍了Spark的最终任务比第一个199的时间长100倍,如何提高的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用数据框运行查询时看到一些性能问题。我在我的研究中看到,长期运行的最终任务可能是一个迹象,表明数据不会受到最佳干扰,但尚未找到解决此问题的详细流程。

我开始加载两个表作为数据框,然后我将这些表加入到一个字段中。为了提高性能,我尝试添加(重新分配)和排序,但仍然看到这个长时间运行的最终任务。下面是我的代码的一个简单版本,请注意,查询1和2实际上并不简单,并使用UDF来计算一些值。



我尝试了一些不同的设置为 spark.sql.shuffle 。我尝试过100次,但失败了(我没有真正调试过,说实话)。我尝试了300,4000和8000.每增加一次,性能就会下降。我选择了一天的数据,每个文件都是一个小时。

  val df1 = sqlContext.sql(Select * from Table1)
val df2 = sqlContext.sql(选择* from Table2)

val distributeDf1 = df1
.repartition(df1(userId))
.sortWithinPartitions(df1(userId))

val distributeDf2 = df2
.repartition(df2(userId))
.sortWithinPartitions(df2(userId))

distributeDf1.registerTempTable(df1 )
distributeDf2.registerTempTable(df2)

val df3 = sqlContext
.sql(
选择
df1。*
from
df1
在$ b $上留下外连接df2 df1.userId = df2.userId)

由于userId似乎并不理想,所以我可以用时间戳来分割。如果我这样做,我应该只是做日期+小时?如果我为此拥有少于200个独特组合,我是否会拥有空执行者?

解决方案

您很明显遇到了一个巨大的正确的数据歪斜问题。让我们看一看>您提供的统计资料

  df1 = [mean = 4.989209978967438,stddev = 2255.654165352454 ,count = 2400088] 
df2 = [mean = 1.0,stddev = 0.0,count = 18408194]

平均值大约为5,标准差超过2000,您会得到长尾巴

由于有些密钥在重新分区后比其他密钥更为频繁,因此一些执行程序将比剩余的要执行更多的工作。



此外,您的描述表明,问题可能出在单个或几个散列到同一分区的键上。



因此,我们首先确定异常值(伪代码):

  val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql(Select * from Table1)
val counts = df.groupBy(userId)。count.cache

val频繁=计数
.where($count>平均值+ 2 * sd)//根据实际dist调整阈值。
.alias(频繁)
.join(df1,Seq(userId))

其余:

  val不常用=计数
。其中($count< ; = mean + 2 * sd)
.alias(infrequent)
.join(df1,Seq(userId))

这真的是值得期待的吗?如果不是,请尝试找出上游问题的来源。



如果预期的话,您可以尝试




  • 播放小表:

      val df2 = sqlContext.sql(Select * from Table2)
    df2.join(broadcast(df1),Seq(userId),rightouter)
  • 拆分,统一( union )并仅广播频繁:

      df2.join(广播(频繁),Seq(userId),rightouter)
    .union(df2.join(罕见,Seq userId),rightouter))


  • salting userId 带有一些随机数据




您不应该
$ b


  • 对所有数据进行重新分区并在本地进行排序(尽管本地排序不应该成为问题)

  • 对完整数据执行标准哈希连接。


I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.

I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.

I have tried a few different settings for spark.sql.shuffle. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour.

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?

解决方案

You clearly have a problem with a huge right data skew. Lets take a look a the statistics you've provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

With mean around 5 and standard deviation over 2000 you get a long tail.

Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.

Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.

So, let's first identify outliers (pseudocode):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

and the rest:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

Is it really something to be expected? If not, try to identify the source of the issue upstream.

If it is expected, you can try:

  • broadcasting smaller table:

    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    

  • splitting, unifying (union) and broadcasting only frequent:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    

  • salting userId with some random data

but you shouldn't:

  • repartition all data and sort locally (although sorting locally alone shouldn't be an issue)
  • perform standard hash joins on full data.

这篇关于Spark的最终任务比第一个199的时间长100倍,如何提高的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆