在Spark DataFrame中按列运行创建唯一的分组键 [英] Creating a unique grouping key from column-wise runs in a Spark DataFrame

查看:209
本文介绍了在Spark DataFrame中按列运行创建唯一的分组键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我与此类似,其中spark是我的sparkContext.我已经在sparkContext中导入了implicits._,因此可以使用$语法:

I have something analogous to this, where spark is my sparkContext. I've imported implicits._ in my sparkContext so I can use the $ syntax:

val df = spark.createDataFrame(Seq(("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L)))
              .toDF("id", "flag")
              .withColumn("index", monotonically_increasing_id)
              .withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))

df.show

df: org.apache.spark.sql.DataFrame = [id: string, flag: bigint ... 2 more fields]
+---+----+-----+-------+
| id|flag|index|run_key|
+---+----+-----+-------+
|  a|   0|    0|      0|
|  b|   1|    1|      1|
|  c|   1|    2|      2|
|  d|   1|    3|      3|
|  e|   0|    4|      0|
|  f|   1|    5|      5|
+---+----+-----+-------+

我想为run_key的每个非零块创建另一个具有唯一分组键的列,这等效于此:

I want to create another column with a unique grouping key for each nonzero chunk of run_key, something equivalent to this:

+---+----+-----+-------+---+
| id|flag|index|run_key|key|
+---+----+-----+-------+---|
|  a|   0|    0|      0|  0|
|  b|   1|    1|      1|  1|
|  c|   1|    2|      2|  1|
|  d|   1|    3|      3|  1|
|  e|   0|    4|      0|  0|
|  f|   1|    5|      5|  2|
+---+----+-----+-------+---+

它可以是每次运行中的第一个值,每次运行的平均值或其他某个值-没关系,只要保证它是唯一的即可,以便以后我可以对它进行分组以比较其他值组之间.

It could be the first value in each run, average of each run, or some other value -- it doesn't really matter as long as it's guaranteed to be unique so that I can group on it afterward to compare other values between groups.

顺便说一句,我不需要保留flag0的行.

BTW, I don't need to retain the rows where flag is 0.

推荐答案

一种方法是1)使用窗口函数lag()从$"flag"创建列$"lag1",2)创建另一列$在切换了$"flag"的行中使用$"index"值切换了"switched",最后3)创建了一个列,该列通过last()rowsBetween()从最后一个非空行复制$"switched".

One approach would be to 1) create a column $"lag1" using Window function lag() from $"flag", 2) create another column $"switched" with $"index" value in rows where $"flag" is switched, and finally 3) create the column which copies $"switched" from the last non-null row via last() and rowsBetween().

请注意,此解决方案使用Window函数而不进行分区,因此可能不适用于大型数据集.

Note that this solution uses Window function without partitioning hence may not work for large dataset.

val df = Seq(
  ("a", 0L), ("b", 1L), ("c", 1L), ("d", 1L), ("e", 0L), ("f", 1L),
  ("g", 1L), ("h", 0L), ("i", 0L), ("j", 1L), ("k", 1L), ("l", 1L)
).toDF("id", "flag").
  withColumn("index", monotonically_increasing_id).
  withColumn("run_key", when($"flag" === 1, $"index").otherwise(0))

import org.apache.spark.sql.expressions.Window

df.withColumn( "lag1", lag("flag", 1, -1).over(Window.orderBy("index")) ).
  withColumn( "switched", when($"flag" =!= $"lag1", $"index") ).
  withColumn( "key", last("switched", ignoreNulls = true).over(
    Window.orderBy("index").rowsBetween(Window.unboundedPreceding, 0)
  ) )

// +---+----+-----+-------+----+--------+---+
// | id|flag|index|run_key|lag1|switched|key|
// +---+----+-----+-------+----+--------+---+
// |  a|   0|    0|      0|  -1|       0|  0|
// |  b|   1|    1|      1|   0|       1|  1|
// |  c|   1|    2|      2|   1|    null|  1|
// |  d|   1|    3|      3|   1|    null|  1|
// |  e|   0|    4|      0|   1|       4|  4|
// |  f|   1|    5|      5|   0|       5|  5|
// |  g|   1|    6|      6|   1|    null|  5|
// |  h|   0|    7|      0|   1|       7|  7|
// |  i|   0|    8|      0|   0|    null|  7|
// |  j|   1|    9|      9|   0|       9|  9|
// |  k|   1|   10|     10|   1|    null|  9|
// |  l|   1|   11|     11|   1|    null|  9|
// +---+----+-----+-------+----+--------+---+

这篇关于在Spark DataFrame中按列运行创建唯一的分组键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆