Spark UDF没有正确提供滚动计数 [英] Spark UDF not giving rolling counts properly

查看:79
本文介绍了Spark UDF没有正确提供滚动计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark UDF,可以精确地计算出一列时间的滚动计数.如果我需要计算24小时的滚动计数(例如,输入时间2020-10-02 09:04:00),则需要回溯至2020-10-01 09:04:00(非常准确).

I have a Spark UDF to calculate rolling counts of a column, precisely wrt time. If I need to calculate a rolling count for 24 hours, for example for entry with time 2020-10-02 09:04:00, I need to look back until 2020-10-01 09:04:00 (very precise).

如果我在本地运行,滚动计数UDF可以很好地工作并给出正确的计数,但是当我在群集上运行时,其给出的结果不正确.这是示例输入和输出

The Rolling count UDF works fine and gives correct counts, if I run locally, but when I run on a cluster, its giving incorrect results. Here is the sample input and output

输入

+---------+-----------------------+
|OrderName|Time                   |
+---------+-----------------------+
|a        |2020-07-11 23:58:45.538|
|a        |2020-07-12 00:00:07.307|
|a        |2020-07-12 00:01:08.817|
|a        |2020-07-12 00:02:15.675|
|a        |2020-07-12 00:05:48.277|
+---------+-----------------------+

预期产量

+---------+-----------------------+-----+
|OrderName|Time                   |Count|
+---------+-----------------------+-----+
|a        |2020-07-11 23:58:45.538|1    |
|a        |2020-07-12 00:00:07.307|2    |
|a        |2020-07-12 00:01:08.817|3    |
|a        |2020-07-12 00:02:15.675|1    |
|a        |2020-07-12 00:05:48.277|1    |
+---------+-----------------------+-----+

最后两个输入值在本地是4和5,但是在群集上它们是不正确的.我最好的猜测是,数据正在执行程序之间分布,并且在每个执行程序上也并行调用udf.由于UDF的参数之一是列(在此示例中,分区键-OrderName),因此,如何控制/纠正群集的行为.这样它就可以以正确的方式为每个分区计算正确的计数.有任何建议吗

Last two entry values are 4 and 5 locally, but on cluster they are incorrect. My best guess is data is being distributed across executors and udf is also being called in parallel on each executor. As one of the parameter to UDF is column (Partition key - OrderName in this example), how could I control/correct the behavior for cluster if thats the case. So that it calculates proper counts for each partition in a right way. Any suggestion please

推荐答案

根据您的评论,您希望计算最近24小时内每条记录的总记录数

As per your comment , you want to count the total no of records of every record for the last 24 hours

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType

//A sample data (Guessing from your question)
val df = Seq(("a","2020-07-10 23:58:45.438","1"),("a","2020-07-11 23:58:45.538","1"),("a","2020-07-11 23:58:45.638","1")).toDF("OrderName","Time","Count")

// Extract the UNIX TIMESTAMP for your time column
val df2 = df.withColumn("unix_time",concat(unix_timestamp($"Time"),split($"Time","\\.")(1)).cast(LongType))

val noOfMilisecondsDay : Long = 24*60*60*1000

//Create a window per `OrderName` and select rows from `current time - 24 hours` to `current time` 
val winSpec = Window.partitionBy("OrderName").orderBy("unix_time").rangeBetween(Window.currentRow - noOfMilisecondsDay, Window.currentRow)

// Final you perform your COUNT or SUM(COUNT) as per your need
val finalDf = df2.withColumn("tot_count", count("OrderName").over(winSpec))

//or val finalDf = df2.withColumn("tot_count", sum("Count").over(winSpec))

这篇关于Spark UDF没有正确提供滚动计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆