Spark窗口自定义函数——获取分区记录总数 [英] Spark window custom function - getting the total number of partition records
问题描述
我有一个时间序列数据集,它按 id 分区,并按时间戳排序.示例:
I have a timeseries dataset, which is partitioned by an id, and ordered by a timestamp. Sample:
ID Timestamp Feature
"XSC" 1986-05-21 44.7530
"XSC" 1986-05-22 44.7530
"XSC" 1986-05-23 23.5678
"TM" 1982-03-08 22.2734
"TM" 1982-03-09 22.1941
"TM" 1982-03-10 22.0847
"TM" 1982-03-11 22.1741
"TM" 1982-03-12 22.1840
"TM" 1982-03-15 22.1344
我有一些需要计算的自定义逻辑,并且应该在每个分区内的每个窗口中完成.我知道 Spark 对窗口函数有丰富的支持,我正在尝试将其用于此目的.
I have some custom logic I need to compute, and it should be done per window, within each partition. I know Spark has a rich support for window functions, which I am trying to use for this purpose.
我的逻辑需要当前窗口/分区中的元素总数,作为标量.我需要它来做一些特定的计算(基本上,一个 for 循环到那个计数).
My logic requires the total number of elements in the current window/partition, as a scalar. I need that to do some specific computations(basically, a for loop up to that count).
我尝试通过执行
val window = Window.partitionBy("id").orderBy("timestamp")
frame = frame.withColumn("my_cnt", count(column).over(window))
我需要做类似的事情:
var i = 1
var y = col("Feature")
var result = y
while (i < /* total number of records within each partition goes here */) {
result = result + lit(1) * lag(y, i).over(window) + /* complex computation */
i = i + 1
}
dataFrame.withColumn("Computed_Value", result)
如何将每个分区中的记录总数作为标量值?我还添加了计数my_cnt"值,它添加了分区的总值,但在我的情况下似乎无法使用它.
How can I get that total number of records within each partition, as a scalar value? I also have that count "my_cnt" value added, which adds the total values for the partition, but can't seem to be able to use it in my case.
推荐答案
Spark 的 collect_list
函数允许您将窗口值聚合为列表.这个列表可以传递给一个udf
来做一些复杂的计算
The collect_list
function of Spark allows you to aggregate the windowed values as a list. This list can be passed to a udf
to do some complex calculations
所以如果你有源
val data = List(
("XSC", "1986-05-21", 44.7530),
("XSC", "1986-05-22", 44.7530),
("XSC", "1986-05-23", 23.5678),
("TM", "1982-03-08", 22.2734),
("TM", "1982-03-09", 22.1941),
("TM", "1982-03-10", 22.0847),
("TM", "1982-03-11", 22.1741),
("TM", "1982-03-12", 22.1840),
("TM", "1982-03-15", 22.1344),
).toDF("id", "timestamp", "feature")
.withColumn("timestamp", to_date('timestamp))
还有一些复杂的函数,包含在记录中的 UDF 中(例如表示为元组)
And some complex function, wrapped in a UDF on your record (represented as a Tuple for instance)
val complexComputationUDF = udf((list: Seq[Row]) => {
list
.map(row => (row.getString(0), row.getDate(1).getTime, row.getDouble(2)))
.sortBy(-_._2)
.foldLeft(0.0) {
case (acc, (id, timestamp, feature)) => acc + feature
}
})
您可以定义一个窗口,将所有分区数据传递给每条记录,或者在有序窗口的情况下,将增量数据传递给每条记录
You can define either a window that passes all partitioned data to each record or, in case of an ordered window, an incremental data to each record
val windowAll = Window.partitionBy("id")
val windowRunning = Window.partitionBy("id").orderBy("timestamp")
把它们放在一个新的数据集中,比如:
And put it all together in a new dataset, like:
val newData = data
// I assuming thatyou need id,timestamp & feature for the complex computattion. So I create a struct
.withColumn("record", struct('id, 'timestamp, 'feature))
// Collect all records in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueAll",
complexComupationUDF(collect_list('record).over(windowAll)))
// Collect records in a time ordered windows in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueRunning",
complexComupationUDF(collect_list('record).over(windowRunning)))
这将导致类似:
+---+----------+-------+--------------------------+------------------+--------------------+
|id |timestamp |feature|record |computedValueAll |computedValueRunning|
+---+----------+-------+--------------------------+------------------+--------------------+
|XSC|1986-05-21|44.753 |[XSC, 1986-05-21, 44.753] |113.07379999999999|44.753 |
|XSC|1986-05-22|44.753 |[XSC, 1986-05-22, 44.753] |113.07379999999999|89.506 |
|XSC|1986-05-23|23.5678|[XSC, 1986-05-23, 23.5678]|113.07379999999999|113.07379999999999 |
|TM |1982-03-08|22.2734|[TM, 1982-03-08, 22.2734] |133.0447 |22.2734 |
|TM |1982-03-09|22.1941|[TM, 1982-03-09, 22.1941] |133.0447 |44.4675 |
|TM |1982-03-10|22.0847|[TM, 1982-03-10, 22.0847] |133.0447 |66.5522 |
|TM |1982-03-11|22.1741|[TM, 1982-03-11, 22.1741] |133.0447 |88.7263 |
|TM |1982-03-12|22.184 |[TM, 1982-03-12, 22.184] |133.0447 |110.91029999999999 |
|TM |1982-03-15|22.1344|[TM, 1982-03-15, 22.1344] |133.0447 |133.0447 |
+---+----------+-------+--------------------------+------------------+--------------------+
这篇关于Spark窗口自定义函数——获取分区记录总数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!