如何在 PySpark 数据框中创建动态组? [英] How to create dynamic group in PySpark dataframe?

查看:23
本文介绍了如何在 PySpark 数据框中创建动态组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

虽然问题是基于连续行的两个或多个列的值创建多个组,但我只是通过这种方式简化了问题.假设有这样的 pyspark 数据框

<预><代码>>>>df=sqlContext.createDataFrame([... Row(SN=1,age=45,gender='M',name='Bob'),... Row(SN=2,age=28,gender='M',name='Albert'),... Row(SN=3,age=33,gender='F',name='Laura'),... Row(SN=4,age=43,gender='F',name='Gloria'),... Row(SN=5,age=18,gender='T',name='Simone'),... Row(SN=6,age=45,gender='M',name='Alax'),... Row(SN=7,age=28,gender='M',name='Robert')])>>>df.show()+---+---+------+------+|SN|年龄|性别|姓名|+---+---+------+------+|1|45|米|鲍勃||2|28|M|阿尔伯特||3|33|F|劳拉||4|43|F|格洛丽亚||5|18|T|西蒙娜||6|45|米|阿拉克斯||7|28|M|罗伯特|+---+---+------+------+

现在我想添加section"列,如果连续行中的性别值匹配,如果下一行部分值中的性别变化增加,则该列将具有相同的值.所以准确地说,我想要这样的输出

+---+---+------+------+-------+|SN|年龄|性别|名称|部分|+---+---+------+------+-------+|1|45|米|鲍勃|1||2|28|M|阿尔伯特|1||3|33|F|劳拉|2||4|43|F|格洛丽亚|2||5|18|T|西蒙娜|3||6|45|米|阿拉克斯|4||7|28|M|罗伯特|4|+---+---+------+------+-------+

解决方案

不清楚您是在寻找 Python 还是 Scala 解决方案,但它们非常相似 - 所以这里是使用 窗口函数:

import spark.implicits._导入函数._//我们将使用此窗口将前一个"性别附加到每条记录val globalWindow = Window.orderBy("SN")//我们将使用这个窗口来计算累积和"//一个只有在性别改变时才为 1 的指标列val upToThisRowWindow = globalWindow.rowsBetween(Long.MinValue, 0)val 结果 = df.withColumn("prevGender", lag("gender", 1) over globalWindow)//添加上一条记录的性别.withColumn("shouldIncrease", when($"prevGender" =!= $"gender", 1) else 0)//转换为 1 或 0.withColumn("section", (sum("shouldIncrease") over upToThisRowWindow) + lit(1))//累积总和.drop("prevGender", "shouldIncrease")//删除辅助列结果.show()//+---+---+------+------+-------+//|SN|年龄|性别|名称|部分|//+---+---+------+------+-------+//|1|45|米|鲍勃|1|//|2|28|M|阿尔伯特|1|//|3|33|F|劳拉|2|//|4|43|F|格洛丽亚|2|//|5|18|T|西蒙娜|3|//|6|45|米|阿拉克斯|4|//|7|28|M|罗伯特|4|//+---+---+------+------+-------+

以下是等效的pyspark代码

from pyspark.sql import Window as W导入系统globalWindow = W.orderBy("SN")upToThisRowWindow = globalWindow.rowsBetween(-sys.maxsize-1, 0)from pyspark.sql 导入函数为 Fdf.withColumn("section", F.sum(F.when(F.lag("gender", 1).over(globalWindow) != df.gender, 1).otherwise(0)).over(upToThisRowWindow)+1).show()

Though problem is of creating multiple group on the basis of two or more column's values of consecutive row, I am just simplifying the problem this way. Suppose have pyspark dataframe like this

>>> df=sqlContext.createDataFrame([
... Row(SN=1,age=45, gender='M', name='Bob'),
... Row(SN=2,age=28, gender='M', name='Albert'),
... Row(SN=3,age=33, gender='F', name='Laura'),
... Row(SN=4,age=43, gender='F', name='Gloria'),
... Row(SN=5,age=18, gender='T', name='Simone'),
... Row(SN=6,age=45, gender='M', name='Alax'),
... Row(SN=7,age=28, gender='M', name='Robert')])
>>> df.show()

+---+---+------+------+
| SN|age|gender|  name|
+---+---+------+------+
|  1| 45|     M|   Bob|
|  2| 28|     M|Albert|
|  3| 33|     F| Laura|
|  4| 43|     F|Gloria|
|  5| 18|     T|Simone|
|  6| 45|     M|  Alax|
|  7| 28|     M|Robert|
+---+---+------+------+

Now I want to add "section" column that will have same value if the gender value in consecutive rows are matching, if gender change in next row section value get incremented. So to be precise, I want output like this

+---+---+------+------+-------+
| SN|age|gender|  name|section|
+---+---+------+------+-------+
|  1| 45|     M|   Bob|      1|
|  2| 28|     M|Albert|      1|
|  3| 33|     F| Laura|      2|
|  4| 43|     F|Gloria|      2|
|  5| 18|     T|Simone|      3|
|  6| 45|     M|  Alax|      4|
|  7| 28|     M|Robert|      4|
+---+---+------+------+-------+

解决方案

Unclear if you're looking for Python or Scala solutions, but they would be pretty similar - so here's a Scala solution using Window Functions:

import spark.implicits._
import functions._

// we'll use this window to attach the "previous" gender to each record
val globalWindow = Window.orderBy("SN")

// we'll use this window to compute "cumulative sum" of 
// an indicator column that would be 1 only if gender changed
val upToThisRowWindow = globalWindow.rowsBetween(Long.MinValue, 0)

val result = df
  .withColumn("prevGender", lag("gender", 1) over globalWindow) // add previous record's gender
  .withColumn("shouldIncrease", when($"prevGender" =!= $"gender", 1) otherwise 0) // translate to 1 or 0
  .withColumn("section", (sum("shouldIncrease") over upToThisRowWindow) + lit(1)) // cumulative sum
  .drop("prevGender", "shouldIncrease") // drop helper columns

result.show()
// +---+---+------+------+-------+
// | SN|age|gender|  name|section|
// +---+---+------+------+-------+
// |  1| 45|     M|   Bob|      1|
// |  2| 28|     M|Albert|      1|
// |  3| 33|     F| Laura|      2|
// |  4| 43|     F|Gloria|      2|
// |  5| 18|     T|Simone|      3|
// |  6| 45|     M|  Alax|      4|
// |  7| 28|     M|Robert|      4|
// +---+---+------+------+-------+

And following is the equivalent pyspark code

from pyspark.sql import Window as W
import sys
globalWindow = W.orderBy("SN")
upToThisRowWindow = globalWindow.rowsBetween(-sys.maxsize-1, 0)
from pyspark.sql import functions as F
df.withColumn("section", F.sum(F.when(F.lag("gender", 1).over(globalWindow) != df.gender, 1).otherwise(0)).over(upToThisRowWindow)+1).show()

这篇关于如何在 PySpark 数据框中创建动态组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆