SQL或Pyspark-获取列最后一次为每个ID具有不同值的时间 [英] SQL or Pyspark - Get the last time a column had a different value for each ID

查看:50
本文介绍了SQL或Pyspark-获取列最后一次为每个ID具有不同值的时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用pyspark,因此我尝试了pyspark代码和SQL.

I am using pyspark so I have tried both pyspark code and SQL.

我正在尝试确定ADDRESS列是一个不同值的时间,该值按USER_ID分组.这些行按TIME排序.取得下表:

I am trying to get the time that the ADDRESS column was a different value, grouped by USER_ID. The rows are ordered by TIME. Take the below table:

    +---+-------+-------+----+
    | ID|USER_ID|ADDRESS|TIME|
    +---+-------+-------+----+
    |  1|      1|      A|  10|
    |  2|      1|      B|  15|
    |  3|      1|      A|  20|
    |  4|      1|      A|  40|
    |  5|      1|      A|  45|
    +---+-------+-------+----+

我想要的正确的新列如下:

The correct new column I would like is as below:

    +---+-------+-------+----+---------+
    | ID|USER_ID|ADDRESS|TIME|LAST_DIFF|
    +---+-------+-------+----+---------+
    |  1|      1|      A|  10|     null|
    |  2|      1|      B|  15|       10|
    |  3|      1|      A|  20|       15|
    |  4|      1|      A|  40|       15|
    |  5|      1|      A|  45|       15|
    +---+-------+-------+----+---------+

我尝试使用不同的窗口,但似乎没有一个窗口能完全得到我想要的.有什么想法吗?

I have tried using different windows but none ever seem to get exactly what I want. Any ideas?

推荐答案

@jxc答案的简化版本.

A simplified version of @jxc's answer.

from pyspark.sql.functions import *
from pyspark.sql import Window
#Window definition
w = Window.partitionBy(col('user_id')).orderBy(col('id'))
#Getting the previous time and classifying rows into groups
grp_df = df.withColumn('grp',sum(when(lag(col('address')).over(w) == col('address'),0).otherwise(1)).over(w)) \
           .withColumn('prev_time',lag(col('time')).over(w))
#Window definition with groups
w_grp = Window.partitionBy(col('user_id'),col('grp')).orderBy(col('id'))
grp_df.withColumn('last_addr_change_time',min(col('prev_time')).over(w_grp)).show()

  • 在列值发生更改时(基于定义的窗口),在运行 sum 时使用 lag 来分配组.从上一行获取时间,该时间将用于下一步.
  • 一旦获得组,请使用运行的 min imum获取列值更改的最后一个时间戳.(建议您查看中间结果以更好地理解转换)
    • Use lag with running sum to assign groups when there is a change in the column value (based on the defined window). Get the time from the previous row, which will be used in the next step.
    • Once you get the groups, use the running minimum to get the last timestamp of the column value change. (Suggest you look at the intermediate results to understand the transformations better)
    • 这篇关于SQL或Pyspark-获取列最后一次为每个ID具有不同值的时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆