火花窗口分区功能将永远完成 [英] Spark window partition function taking forever to complete

查看:75
本文介绍了火花窗口分区功能将永远完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给出一个数据框,我正在尝试计算过去30天内看到一次emailId的次数.我函数的主要逻辑如下:

Given a dataframe, I am trying to compute how many times I have seen an emailId in the past 30 days. The main logic in my function is the following:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

配置:

spark.executor.cores=5

因此,我可以看到5个具有窗口功能的阶段,其中一些阶段很快完成(几秒钟),还有2个甚至在3小时内没有完成,最后一个阶段被卡住了.几个任务(进展非常缓慢):

So, I can see 5 stages which have window functions in them, some stages out of those are completed very quickly (in a few seconds) and there are 2 that did not even finish in 3 hours, being stuck at the last few tasks(progressing very slowly):

这是我的数据偏斜问题,如果我从数据集中删除所有包含5个频率最高的 email id的行,则该作业很快就会完成(不到5分钟).

This is a problem of data skew to me, if I remove all the rows containing the 5 highest frequency email ids from the dataset, the job finishes soon(less than 5 min).

如果我尝试在窗口分区中使用其他键,则该作业将在几分钟内完成:

If I try to use some other key within window partitionBy, the job finishes in a few minutes:

 Window.partitionBy("email", "date")

但是很明显,如果我这样做,它将执行错误的计数计算,这不是可接受的解决方案.

But obviously it performs wrong count calculations if I do that and it's not an acceptable solution.

我尝试了其他各种Spark设置,它们抛出了更多的内存,内核,并行度等,但这些似乎都没有帮助.

火花版本:2.2

当前Spark配置:

-执行者内存:100G

-executor-memory: 100G

-executor-cores:5

-executor-cores: 5

-驱动程序内存:80G

-driver memory: 80G

-spark.executor.memory = 100g

-spark.executor.memory=100g

每台计算机均使用16核128 GB内存.节点数上限为500.

Using machines each with 16 core, 128 gb memory. Maximum # of nodes up to 500.

解决这个问题的正确方法是什么?

What would be the right way to tackle this problem?

更新:只是为了提供更多背景信息,以下是原始数据框和相应的计算数据框:

Update: Just to give more context, here is the original dataframe and the corresponding computed dataframe:

 val df = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00"),
      ("a@gmail.com", "2019-11-02 01:04:00"), 
      ("a@gmail.com", "2019-11-22 02:04:00"),
      ("a@gmail.com", "2019-11-22 05:04:00"),
      ("a@gmail.com", "2019-12-02 03:04:00"),
      ("a@gmail.com", "2020-01-01 04:04:00"),
      ("a@gmail.com", "2020-03-11 05:04:00"),
      ("a@gmail.com", "2020-04-05 12:04:00"),
      ("b@gmail.com", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")


val expectedDF = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00", 1),
      ("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("a@gmail.com", "2019-11-22 02:04:00", 2),
      ("a@gmail.com", "2019-11-22 05:04:00", 3),
      ("a@gmail.com", "2019-12-02 03:04:00", 3),
      ("a@gmail.com", "2020-01-01 04:04:00", 1),
      ("a@gmail.com", "2020-03-11 05:04:00", 1),
      ("a@gmail.com", "2020-04-05 12:04:00", 2),
      ("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count") 

推荐答案

您的某些分区可能太大,这是由于某些电子邮件在一个月内有太多数据这一事实.

Some of your partitions are probably too large which is due to the fact that for some emails, there is too much data in one month.

要解决此问题,您可以仅使用电子邮件和时间戳创建一个新的数据框.然后,您可以通过电子邮件和时间戳分组,计算行数,并根据希望少得多的数据计算窗口.如果时间戳倾向于重复,即 df.count 远大于 df.select("email","timestamp").distinct,则将加速计算.计数.如果不是这种情况,则可以截断时间戳,但会损失一些精度.这样,您无需计算最近30天内的发生次数(因为时间戳以秒为单位,所以要给出或花费一秒钟),而是根据您的实际情况来计算给出或花费一分钟或一小时甚至一天的出现次数需要.您会损失一些精度,但会大大加快计算速度.而且,您提供的精度越高,您获得的速度就越快.

To fix this, you can create a new dataframe with only the emails and the timestamps. Then, you group by email and timestamp, count the number of lines and compute the window on hopefully much less data. The computation will be sped up if timestamps tend to be duplicated, that is if df.count is much greater than df.select("email", "timestamp").distinct.count. If it is not the case, you can truncate the timestamp at the cost of losing some precision. This way, instead of counting the number of occurrences within the last 30 days (give or take one second since timestamps are in seconds), you would count the number of occurrences give or take one minute or one hour or even one day depending on your need. You would lose a bit of precision but speed up you computation a lot. And the more precision you give in, the more speed you gain.

代码如下:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))

这篇关于火花窗口分区功能将永远完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆