如何找到组中的第一个非空值? (使用数据集api进行二次排序) [英] How to find first non-null values in groups? (secondary sorting using dataset api)
问题描述
我正在处理代表事件流(例如从网站跟踪事件而解雇)的数据集.所有事件都有时间戳.我们经常遇到的一个用例是尝试查找给定字段的第一个非null值.因此,例如类似的东西可以使我们最有效地到达目的地:
I am working on a dataset which represents a stream of events (like fired as tracking events from a website). All the events have a timestamp. One use case we often have is trying to find the 1st non null value for a given field. So for example something like gets us most the way there:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
上述代码的问题是,不能保证将数据馈送到first
聚合函数的顺序.我希望按timestamp
对其进行排序,以确保它是时间戳中的第一个非null userId,而不是任何随机的非null userId.
The problem with the above code is that the order of the data being fed into that first
aggregation function is not guaranteed. I would like it to be sorted by timestamp
to ensure that it is the 1st non null userId by timestamp rather than any random non null userId.
有没有一种方法可以定义分组中的排序?
Is there a way to define the sorting within a grouping?
使用Spark 2.10
Using Spark 2.10
BTW,在 SPARK DataFrame中为Spark 2.10建议的方式:选择每个组的第一行是要在分组之前进行排序-这是行不通的.例如下面的代码:
BTW, the way suggested for Spark 2.10 in SPARK DataFrame: select the first row of each group is to do ordering before the grouping -- that doesn't work. For example the following code:
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
有时会返回Array([a,y])
,有时会返回Array([a,x])
Will sometimes return Array([a,y])
and sometimes Array([a,x])
推荐答案
使用我心爱的 窗口(...并体验您的生活变得更简单!)
Use my beloved windows (...and experience how much simpler your life becomes !)
import org.apache.spark.sql.expressions.Window
val byKeyOrderByOrdering = Window
.partitionBy("key")
.orderBy("ordering")
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
import org.apache.spark.sql.functions.first
val firsts = ds.withColumn("first",
first("value", ignoreNulls = true) over byKeyOrderByOrdering)
scala> firsts.show
+---+-----+--------+-----+
|key|value|ordering|first|
+---+-----+--------+-----+
| a| null| 1| x|
| a| null| 2| x|
| a| x| 3| x|
| a| y| 4| x|
| a| null| 5| x|
+---+-----+--------+-----+
注意:不知何故,Spark 2.2.0-SNAPSHOT(今天构建)在没有rangeBetween
的情况下无法给我正确的答案,我认为这应该是默认的无界范围.
NOTE: Somehow, Spark 2.2.0-SNAPSHOT (built today) could not give me the correct answer with no rangeBetween
which I thought should've been the default unbounded range.
这篇关于如何找到组中的第一个非空值? (使用数据集api进行二次排序)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!