Spark窗口分区功能需要永远完成 [英] Spark window partition function taking forever to complete

查看:22
本文介绍了Spark窗口分区功能需要永远完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一个数据框,我试图计算在过去 30 天内我看到 emailId 的次数.我的函数中的主要逻辑如下:

val new_df = df.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))val winSpec = 窗口.partitionBy(电子邮件").orderBy(col("transaction_timestamp")).rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)val resultDF = new_df.filter(col(条件")).withColumn("count", count(col("email")).over(winSpec))

配置:

spark.executor.cores=5

所以,我可以看到 5 个包含窗口函数的阶段,其中一些阶段完成得非常快(几秒钟内),还有 2 个阶段甚至在 3 小时内都没有完成,最后被卡住了很少的任务(进展非常缓慢):

这是一个数据倾斜的问题,如果我从数据集中删除所有包含 5 个最高频率 email id 的行,工作很快就会完成(不到 5 分钟).

如果我尝试在 window partitionBy 中使用其他一些键,工作会在几分钟内完成:

 Window.partitionBy("email", "date")

但显然,如果我这样做,它会执行错误的计数计算,而且这不是可接受的解决方案.

我尝试了各种其他的 Spark 设置,以增加内存、内核、并行度等,但这些似乎都没有帮助.

Spark 版本:2.2

当前 Spark 配置:

-executor-memory: 100G

-executor-cores: 5

-驱动内存:80G

-spark.executor.memory=100g

使用每台具有 16 核、128 GB 内存的机器.最多 500 个节点.

解决这个问题的正确方法是什么?

更新:为了提供更多上下文,这里是原始数据框和相应的计算数据框:

 val df = Seq((a@gmail.com",2019-10-01 00:04:00"),(a@gmail.com",2019-11-02 01:04:00"),(a@gmail.com",2019-11-22 02:04:00"),(a@gmail.com",2019-11-22 05:04:00"),(a@gmail.com",2019-12-02 03:04:00"),(a@gmail.com"、2020-01-01 04:04:00")、(a@gmail.com"、2020-03-11 05:04:00")、(a@gmail.com"、2020-04-05 12:04:00")、(b@gmail.com"、2020-05-03 03:04:00")).toDF("email", "transaction_timestamp")val 预期DF = Seq((a@gmail.com"、2019-10-01 00:04:00"、1)、("a@gmail.com","2019-11-02 01:04:00", 1),//前一个落在过去 30 天之外的获胜(a@gmail.com"、2019-11-22 02:04:00"、2)、(a@gmail.com"、2019-11-22 05:04:00"、3)、(a@gmail.com"、2019-12-02 03:04:00"、3)、(a@gmail.com"、2020-01-01 04:04:00"、1)、(a@gmail.com"、2020-03-11 05:04:00"、1)、(a@gmail.com"、2020-04-05 12:04:00"、2)、("b@gmail.com", "2020-05-03 03:04:00", 1)//新邮箱).toDF("email", "transaction_timestamp", count")

解决方案

你的部分分区可能过大,可能是因为有些邮件一个月数据量太大.

要解决此问题,您可以创建一个仅包含电子邮件和时间戳的新数据框.然后,您按电子邮件和时间戳分组,计算行数并根据希望少得多的数据计算窗口.如果时间戳倾向于重复,即如果 df.count 远大于 df.select("email", "timestamp").distinct,则计算将加快.计数.如果不是这种情况,您可以以损失一些精度为代价截断时间戳.这样,您无需计算过去 30 天内的发生次数(因为时间戳以秒为单位,所以给或花一秒),您可以根据您的情况计算一分钟或一小时甚至一天的发生次数需要.你会失去一点精度,但会大大加快计算速度.而且您提供的精度越高,您获得的速度就越快.

代码如下所示:

//3600 表示每小时精度.//设置为 60 为分钟精度,1 为秒精度,24*3600 为一天.//请注意,即使 precisionLoss = 1 也可能使您获得速度取决于//数据分布val 精度损失 = 3600val win_size = NumberOfSecondsIn30Days/precisionLossval winSpec = 窗口.partitionBy(电子邮件").orderBy(截断时间戳").rangeBetween(-win_size, Window.currentRow)val new_df = df.withColumn(truncated_timestamp",unix_timestamp($"timestamp")/3600 cast "long")val 计数 = new_df.groupBy("email", "truncated_timestamp").数数.withColumn("count", sum('count) over winSpec)val 结果 = new_df.join(counts, Seq("email", "truncated_timestamp"))

Given a dataframe, I am trying to compute how many times I have seen an emailId in the past 30 days. The main logic in my function is the following:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

The config:

spark.executor.cores=5

So, I can see 5 stages which have window functions in them, some stages out of those are completed very quickly (in a few seconds) and there are 2 that did not even finish in 3 hours, being stuck at the last few tasks(progressing very slowly):

This is a problem of data skew to me, if I remove all the rows containing the 5 highest frequency email ids from the dataset, the job finishes soon(less than 5 min).

If I try to use some other key within window partitionBy, the job finishes in a few minutes:

 Window.partitionBy("email", "date")

But obviously it performs wrong count calculations if I do that and it's not an acceptable solution.

I have tried various other spark settings throwing more memory, cores, parallelism etc. and none of those have seemed to help.

Spark Version: 2.2

Current Spark configuration:

-executor-memory: 100G

-executor-cores: 5

-driver memory: 80G

-spark.executor.memory=100g

Using machines each with 16 core, 128 gb memory. Maximum # of nodes up to 500.

What would be the right way to tackle this problem?

Update: Just to give more context, here is the original dataframe and the corresponding computed dataframe:

 val df = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00"),
      ("a@gmail.com", "2019-11-02 01:04:00"), 
      ("a@gmail.com", "2019-11-22 02:04:00"),
      ("a@gmail.com", "2019-11-22 05:04:00"),
      ("a@gmail.com", "2019-12-02 03:04:00"),
      ("a@gmail.com", "2020-01-01 04:04:00"),
      ("a@gmail.com", "2020-03-11 05:04:00"),
      ("a@gmail.com", "2020-04-05 12:04:00"),
      ("b@gmail.com", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")


val expectedDF = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00", 1),
      ("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("a@gmail.com", "2019-11-22 02:04:00", 2),
      ("a@gmail.com", "2019-11-22 05:04:00", 3),
      ("a@gmail.com", "2019-12-02 03:04:00", 3),
      ("a@gmail.com", "2020-01-01 04:04:00", 1),
      ("a@gmail.com", "2020-03-11 05:04:00", 1),
      ("a@gmail.com", "2020-04-05 12:04:00", 2),
      ("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count") 

解决方案

Some of your partitions are probably too large which is due to the fact that for some emails, there is too much data in one month.

To fix this, you can create a new dataframe with only the emails and the timestamps. Then, you group by email and timestamp, count the number of lines and compute the window on hopefully much less data. The computation will be sped up if timestamps tend to be duplicated, that is if df.count is much greater than df.select("email", "timestamp").distinct.count. If it is not the case, you can truncate the timestamp at the cost of losing some precision. This way, instead of counting the number of occurrences within the last 30 days (give or take one second since timestamps are in seconds), you would count the number of occurrences give or take one minute or one hour or even one day depending on your need. You would lose a bit of precision but speed up you computation a lot. And the more precision you give in, the more speed you gain.

The code would look like this:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))

这篇关于Spark窗口分区功能需要永远完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆