从 Spark DataFrame 中的逐列运行创建唯一的分组键 [英] Creating a unique grouping key from column-wise runs in a Spark DataFrame
问题描述
我有类似的东西,其中 spark
是我的 sparkContext
.我在我的 sparkContext
中导入了 implicits._
所以我可以使用 $
语法:
I have something analogous to this, where spark
is my sparkContext
. I've imported implicits._
in my sparkContext
so I can use the $
syntax:
val df = spark.createDataFrame(Seq(("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L)))
.toDF("id", "flag")
.withColumn("index", monotonically_increasing_id)
.withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))
df.show
df: org.apache.spark.sql.DataFrame = [id: string, flag: bigint ... 2 more fields]
+---+----+-----+-------+
| id|flag|index|run_key|
+---+----+-----+-------+
| a| 0| 0| 0|
| b| 1| 1| 1|
| c| 1| 2| 2|
| d| 1| 3| 3|
| e| 0| 4| 0|
| f| 1| 5| 5|
+---+----+-----+-------+
我想为 run_key
的每个非零块创建另一个具有唯一分组键的列,与此等效:
I want to create another column with a unique grouping key for each nonzero chunk of run_key
, something equivalent to this:
+---+----+-----+-------+---+
| id|flag|index|run_key|key|
+---+----+-----+-------+---|
| a| 0| 0| 0| 0|
| b| 1| 1| 1| 1|
| c| 1| 2| 2| 1|
| d| 1| 3| 3| 1|
| e| 0| 4| 0| 0|
| f| 1| 5| 5| 2|
+---+----+-----+-------+---+
它可能是每次运行的第一个值、每次运行的平均值或其他一些值——只要保证它是唯一的,我就可以对其进行分组以比较其他值,这并不重要组之间.
It could be the first value in each run, average of each run, or some other value -- it doesn't really matter as long as it's guaranteed to be unique so that I can group on it afterward to compare other values between groups.
顺便说一句,我不需要保留 flag
为 0
的行.
BTW, I don't need to retain the rows where flag
is 0
.
推荐答案
一种方法是 1) 使用 $"flag" 中的窗口函数 lag()
创建列 $"lag1",2) 在切换 $"flag" 的行中创建另一列 $"switched" 和 $"index" 值,最后 3) 创建通过 last 从最后一个非空行复制 $"switched" 的列()
和 rowsBetween()
.
One approach would be to 1) create a column $"lag1" using Window function lag()
from $"flag", 2) create another column $"switched" with $"index" value in rows where $"flag" is switched, and finally 3) create the column which copies $"switched" from the last non-null row via last()
and rowsBetween()
.
请注意,此解决方案使用 Window 函数而不进行分区,因此可能不适用于大型数据集.
Note that this solution uses Window function without partitioning hence may not work for large dataset.
val df = Seq(
("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L),
("g", 1L), ("h", 0L), ("i", 0L), ("j", 1L), ("k", 1L), ("l", 1L)
).toDF("id", "flag").
withColumn("index", monotonically_increasing_id).
withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))
import org.apache.spark.sql.expressions.Window
df.withColumn( "lag1", lag("flag", 1, -1).over(Window.orderBy("index")) ).
withColumn( "switched", when($"flag" =!= $"lag1", $"index") ).
withColumn( "key", last("switched", ignoreNulls = true).over(
Window.orderBy("index").rowsBetween(Window.unboundedPreceding, 0)
) )
// +---+----+-----+-------+----+--------+---+
// | id|flag|index|run_key|lag1|switched|key|
// +---+----+-----+-------+----+--------+---+
// | a| 0| 0| 0| -1| 0| 0|
// | b| 1| 1| 1| 0| 1| 1|
// | c| 1| 2| 2| 1| null| 1|
// | d| 1| 3| 3| 1| null| 1|
// | e| 0| 4| 0| 1| 4| 4|
// | f| 1| 5| 5| 0| 5| 5|
// | g| 1| 6| 6| 1| null| 5|
// | h| 0| 7| 0| 1| 7| 7|
// | i| 0| 8| 0| 0| null| 7|
// | j| 1| 9| 9| 0| 9| 9|
// | k| 1| 10| 10| 1| null| 9|
// | l| 1| 11| 11| 1| null| 9|
// +---+----+-----+-------+----+--------+---+
这篇关于从 Spark DataFrame 中的逐列运行创建唯一的分组键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!