Spark 最终任务比前 199 个任务耗时 100 倍,如何改进 [英] Spark final task takes 100x times longer than first 199, how to improve

查看:17
本文介绍了Spark 最终任务比前 199 个任务耗时 100 倍,如何改进的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用数据帧运行查询时发现了一些性能问题.我在我的研究中看到,长时间运行的 finally 任务可能表明数据未受到最佳干扰,但尚未找到解决此问题的详细过程.

I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.

我开始加载两个表作为数据框,然后我将这些表加入一个字段.我已经尝试添加分发(重新分区)和排序,以提高性能,但我仍然看到这个长期运行的最终任务.这是我的代码的一个简单版本,请注意查询一和二实际上并不那么简单,而是使用 UDF 来计算一些值.

I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.

我为 spark.sql.shuffle 尝试了几种不同的设置.我已经尝试了 100 次,但它失败了(说实话,我并没有真正调试太多).我尝试了 300、4000 和 8000.性能随着每次增加而下降.我正在选择一天的数据,其中每个文件是一个小时.

I have tried a few different settings for spark.sql.shuffle. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour.

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

由于似乎按 userId 分区并不理想,我可以改为按时间戳分区.如果我这样做,我应该只做日期+小时吗?如果我有少于 200 个独特的组合,我会有空的执行者吗?

Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?

推荐答案

Spark >= 3.0

自 3.0 Spark 提供用于处理倾斜连接的内置优化 - 可以使用 spark.sql.adaptive.optimizeSkewedJoin.enabled 属性启用.

Since 3.0 Spark provides built-in optimizations for handling skewed joins - which can be enabled using spark.sql.adaptive.optimizeSkewedJoin.enabled property.

有关详细信息,请参阅 SPARK-29544.

See SPARK-29544 for details.

火花<3.0

您显然遇到了正确数据倾斜的问题.让我们来看看 您提供的统计数据:

You clearly have a problem with a huge right data skew. Lets take a look a the statistics you've provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

平均值约为 5,标准差超过 2000,您会得到长尾.

With mean around 5 and standard deviation over 2000 you get a long tail.

由于某些键在重新分区后比其他键更频繁,因此某些执行程序将比剩余的执行程序有更多的工作要做.

Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.

此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上.

Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.

那么,让我们首先识别异常值(伪代码):

So, let's first identify outliers (pseudocode):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

其他:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

这真的是意料之中的事情吗?如果不是,请尝试确定上游问题的根源.

Is it really something to be expected? If not, try to identify the source of the issue upstream.

如果是预期的,你可以试试:

  • 广播较小的表:

  • broadcasting smaller table:

val df2 = sqlContext.sql("Select * from Table2")
df2.join(broadcast(df1), Seq("userId"), "rightouter")

  • 分裂、统一(union)和广播只频繁:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    

  • 用一些随机数据给 userId 加盐

    你不应该:

    • 重新分区所有数据并在本地排序(尽管单独在本地排序应该不是问题)
    • 对完整数据执行标准哈希连接.

    这篇关于Spark 最终任务比前 199 个任务耗时 100 倍,如何改进的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆