从 Spark DataFrame 中的逐列运行创建唯一的分组键 [英] Creating a unique grouping key from column-wise runs in a Spark DataFrame

查看:30
本文介绍了从 Spark DataFrame 中的逐列运行创建唯一的分组键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有类似的东西,其中 spark 是我的 sparkContext.我在我的 sparkContext 中导入了 implicits._ 所以我可以使用 $ 语法:

I have something analogous to this, where spark is my sparkContext. I've imported implicits._ in my sparkContext so I can use the $ syntax:

val df = spark.createDataFrame(Seq(("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L)))
              .toDF("id", "flag")
              .withColumn("index", monotonically_increasing_id)
              .withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))

df.show

df: org.apache.spark.sql.DataFrame = [id: string, flag: bigint ... 2 more fields]
+---+----+-----+-------+
| id|flag|index|run_key|
+---+----+-----+-------+
|  a|   0|    0|      0|
|  b|   1|    1|      1|
|  c|   1|    2|      2|
|  d|   1|    3|      3|
|  e|   0|    4|      0|
|  f|   1|    5|      5|
+---+----+-----+-------+

我想为 run_key 的每个非零块创建另一个具有唯一分组键的列,与此等效:

I want to create another column with a unique grouping key for each nonzero chunk of run_key, something equivalent to this:

+---+----+-----+-------+---+
| id|flag|index|run_key|key|
+---+----+-----+-------+---|
|  a|   0|    0|      0|  0|
|  b|   1|    1|      1|  1|
|  c|   1|    2|      2|  1|
|  d|   1|    3|      3|  1|
|  e|   0|    4|      0|  0|
|  f|   1|    5|      5|  2|
+---+----+-----+-------+---+

它可能是每次运行的第一个值、每次运行的平均值或其他一些值——只要保证它是唯一的,我就可以对其进行分组以比较其他值,这并不重要组之间.

It could be the first value in each run, average of each run, or some other value -- it doesn't really matter as long as it's guaranteed to be unique so that I can group on it afterward to compare other values between groups.

顺便说一句,我不需要保留 flag0 的行.

BTW, I don't need to retain the rows where flag is 0.

推荐答案

一种方法是 1) 使用 $"flag" 中的窗口函数 lag() 创建列 $"lag1",2) 在切换 $"flag" 的行中创建另一列 $"switched" 和 $"index" 值,最后 3) 创建通过 last 从最后一个非空行复制 $"switched" 的列()rowsBetween().

One approach would be to 1) create a column $"lag1" using Window function lag() from $"flag", 2) create another column $"switched" with $"index" value in rows where $"flag" is switched, and finally 3) create the column which copies $"switched" from the last non-null row via last() and rowsBetween().

请注意,此解决方案使用 Window 函数而不进行分区,因此可能不适用于大型数据集.

Note that this solution uses Window function without partitioning hence may not work for large dataset.

val df = Seq(
  ("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L),
  ("g", 1L), ("h", 0L), ("i", 0L), ("j", 1L), ("k", 1L), ("l", 1L)
).toDF("id", "flag").
  withColumn("index", monotonically_increasing_id).
  withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))

import org.apache.spark.sql.expressions.Window

df.withColumn( "lag1", lag("flag", 1, -1).over(Window.orderBy("index")) ).
  withColumn( "switched", when($"flag" =!= $"lag1", $"index") ).
  withColumn( "key", last("switched", ignoreNulls = true).over(
    Window.orderBy("index").rowsBetween(Window.unboundedPreceding, 0)
  ) )

// +---+----+-----+-------+----+--------+---+
// | id|flag|index|run_key|lag1|switched|key|
// +---+----+-----+-------+----+--------+---+
// |  a|   0|    0|      0|  -1|       0|  0|
// |  b|   1|    1|      1|   0|       1|  1|
// |  c|   1|    2|      2|   1|    null|  1|
// |  d|   1|    3|      3|   1|    null|  1|
// |  e|   0|    4|      0|   1|       4|  4|
// |  f|   1|    5|      5|   0|       5|  5|
// |  g|   1|    6|      6|   1|    null|  5|
// |  h|   0|    7|      0|   1|       7|  7|
// |  i|   0|    8|      0|   0|    null|  7|
// |  j|   1|    9|      9|   0|       9|  9|
// |  k|   1|   10|     10|   1|    null|  9|
// |  l|   1|   11|     11|   1|    null|  9|
// +---+----+-----+-------+----+--------+---+

这篇关于从 Spark DataFrame 中的逐列运行创建唯一的分组键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆