如何在PySpark数据框中创建动态组? [英] How to create dynamic group in PySpark dataframe?

查看:124
本文介绍了如何在PySpark数据框中创建动态组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

虽然问题是基于连续行的两个或多个列的值创建多个组,但我只是通过这种方式简化了问题。假设有像这样的pyspark数据框

 >>> df = sqlContext.createDataFrame([
... Row(SN = 1,age = 45,gender ='M',name ='Bob'),
... Row(SN = 2,年龄= 28,性别='M',姓名='Albert'),
...行(SN = 3,年龄= 33,性别='F',姓名='Laura'),
...行(SN = 4,年龄= 43,性别='F',姓名='Gloria'),
...行(SN = 5,年龄= 18,性别='T', (SN = 6,年龄= 45,性别='M',姓名='Alax'),
...行(SN = 7,年龄= 28,性别='M',姓名='罗伯特')])
>>> df.show()

+ --- + --- + ------ + ------ +
| SN |年龄|性别|名称|
+ --- + --- + ------ + ------ +
| 1 | 45 | M |鲍勃|
| 2 | 28 | M |伟业|
| 3 | 33 | F |劳拉|
| 4 | 43 | F |凯莱|
| 5 | 18 | T |西蒙|
| 6 | 45 | M |阿拉善|
| 7 | 28 | M |罗伯特|
+ --- + --- + ------ + ------ +

现在我想添加section列,如果连续行中的性别值匹配,如果下一行部分值中的性别更改得到增加,那么该列中将具有相同的值。准确地说,我想要这样的输出

  + --- + --- + ------ + ------ + ------- + 
| SN |年龄|性别|名称|部分|
+ --- + --- + ------ + ------ + ------- +
| 1 | 45 | M |鲍勃| 1 |
| 2 | 28 | M |伟业| 1 |
| 3 | 33 | F |劳拉| 2 |
| 4 | 43 | F |凯莱| 2 |
| 5 | 18 | T |西蒙| 3 |
| 6 | 45 | M |阿拉善| 4 |
| 7 | 28 | M |罗伯特| 4 |
+ --- + --- + ------ + ------ + ------- +


解决方案

不清楚如果您正在寻找Python或Scala解决方案,但它们会非常相似 - 所以这里是一个Scala解决方案, strong>窗口函数:

  import spark.implicits._ 
导入函数._

//我们将使用此窗口将前一个性别附加到每条记录
val globalWindow = Window.orderBy(SN)

//我们'将使用此窗口来计算
的累积和//只有当性别发生变化时,才会为1的指标列
val upToThisRowWindow = globalWindow.rowsBetween(Long.MinValue,0)

val result = df
.withColumn(prevGender,lag(gender,1)over globalWindow)//添加上一条记录的性别
.withColumn(shouldIncrease,when($ prevGender=!= $gender,1)否则0)//转换为1或0
.withColumn(section,(sum(shouldIncrease)overToThisRowWindow) + lit(1))//累计和
.drop(prevGender,shouldIncrease)// drop helper列

result.show()
// + --- + --- + ------ + ------ + ------- +
// | SN |年龄|性别|名称|部分|
// + --- + --- + ------ + ------ + ------- +
// | 1 | 45 | M |鲍勃| 1 |
// | 2 | 28 | M |伟业| 1 |
// | 3 | 33 | F |劳拉| 2 |
// | 4 | 43 | F |凯莱| 2 |
// | 5 | 18 | T |西蒙| 3 |
// | 6 | 45 | M |阿拉善| 4 |
// | 7 | 28 | M |罗伯特| 4 |
// + --- + --- + ------ + ------ + ------- +

以下是等价的 pyspark 代码

<$从pyspark.sql导入Window作为W
导入sys
globalWindow = W.orderBy(SN)
upToThisRowWindow = globalWindow.rowsBetween(-sys)中的p $ p> .maxsize-1,0)
from pyspark.sql导入函数为F
df.withColumn(section,F.sum(F.when(F.lag(gender,1))。 (globalWindow)!= df.gender,1).otherwise(0))。over(upToThisRowWindow)+1).show()


Though problem is of creating multiple group on the basis of two or more column's values of consecutive row, I am just simplifying the problem this way. Suppose have pyspark dataframe like this

>>> df=sqlContext.createDataFrame([
... Row(SN=1,age=45, gender='M', name='Bob'),
... Row(SN=2,age=28, gender='M', name='Albert'),
... Row(SN=3,age=33, gender='F', name='Laura'),
... Row(SN=4,age=43, gender='F', name='Gloria'),
... Row(SN=5,age=18, gender='T', name='Simone'),
... Row(SN=6,age=45, gender='M', name='Alax'),
... Row(SN=7,age=28, gender='M', name='Robert')])
>>> df.show()

+---+---+------+------+
| SN|age|gender|  name|
+---+---+------+------+
|  1| 45|     M|   Bob|
|  2| 28|     M|Albert|
|  3| 33|     F| Laura|
|  4| 43|     F|Gloria|
|  5| 18|     T|Simone|
|  6| 45|     M|  Alax|
|  7| 28|     M|Robert|
+---+---+------+------+

Now I want to add "section" column that will have same value if the gender value in consecutive rows are matching, if gender change in next row section value get incremented. So to be precise, I want output like this

+---+---+------+------+-------+
| SN|age|gender|  name|section|
+---+---+------+------+-------+
|  1| 45|     M|   Bob|      1|
|  2| 28|     M|Albert|      1|
|  3| 33|     F| Laura|      2|
|  4| 43|     F|Gloria|      2|
|  5| 18|     T|Simone|      3|
|  6| 45|     M|  Alax|      4|
|  7| 28|     M|Robert|      4|
+---+---+------+------+-------+

解决方案

Unclear if you're looking for Python or Scala solutions, but they would be pretty similar - so here's a Scala solution using Window Functions:

import spark.implicits._
import functions._

// we'll use this window to attach the "previous" gender to each record
val globalWindow = Window.orderBy("SN")

// we'll use this window to compute "cumulative sum" of 
// an indicator column that would be 1 only if gender changed
val upToThisRowWindow = globalWindow.rowsBetween(Long.MinValue, 0)

val result = df
  .withColumn("prevGender", lag("gender", 1) over globalWindow) // add previous record's gender
  .withColumn("shouldIncrease", when($"prevGender" =!= $"gender", 1) otherwise 0) // translate to 1 or 0
  .withColumn("section", (sum("shouldIncrease") over upToThisRowWindow) + lit(1)) // cumulative sum
  .drop("prevGender", "shouldIncrease") // drop helper columns

result.show()
// +---+---+------+------+-------+
// | SN|age|gender|  name|section|
// +---+---+------+------+-------+
// |  1| 45|     M|   Bob|      1|
// |  2| 28|     M|Albert|      1|
// |  3| 33|     F| Laura|      2|
// |  4| 43|     F|Gloria|      2|
// |  5| 18|     T|Simone|      3|
// |  6| 45|     M|  Alax|      4|
// |  7| 28|     M|Robert|      4|
// +---+---+------+------+-------+

And following is the equivalent pyspark code

from pyspark.sql import Window as W
import sys
globalWindow = W.orderBy("SN")
upToThisRowWindow = globalWindow.rowsBetween(-sys.maxsize-1, 0)
from pyspark.sql import functions as F
df.withColumn("section", F.sum(F.when(F.lag("gender", 1).over(globalWindow) != df.gender, 1).otherwise(0)).over(upToThisRowWindow)+1).show()

这篇关于如何在PySpark数据框中创建动态组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆