如何检测pyspark数据框列中的模式何时发生变化 [英] How to detect when a pattern changes in a pyspark dataframe column

查看:69
本文介绍了如何检测pyspark数据框列中的模式何时发生变化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下数据框:

+-------------------+--------+-----------+
|DateTime           |UID.    |result     |
+-------------------+--------+-----------+
|2020-02-29 11:42:34|0000111D|30         |
|2020-02-30 11:47:34|0000111D|30         |
|2020-02-30 11:48:34|0000111D|30         |
|2020-02-30 11:49:34|0000111D|30         |
|2020-02-30 11:50:34|0000111D|30         |
|2020-02-25 11:50:34|0000111D|29         |
|2020-02-25 11:50:35|0000111D|29         |
|2020-02-26 11:52:35|0000111D|29         |
|2020-02-27 11:52:35|0000111D|29         |
|2020-02-28 11:52:35|0000111D|29         |
|2020-03-01 11:52:35|0000111D|28         |
|2020-03-02 11:12:35|0000111D|28         |
|2020-03-02 11:52:35|0000111D|28         |
|2020-03-03 12:32:35|0000111D|28         |
|2020-03-04 12:02:35|0000111D|28         |
|2020-03-05 11:12:45|0000111D|28         |
|2020-03-06 11:02:45|0000111D|27         |
|2020-03-07 10:32:45|0000111D|27         |
|2020-03-08 11:52:45|0000111D|27         |
|2020-03-09 11:12:45|0000111D|27         |
|2020-03-10 11:12:45|0000111D|27         |
|2020-03-11 11:48:45|0000111D|27         |
|2020-03-12 11:02:45|0000111D|27         |
|2020-03-13 11:28:45|0000111D|26         |
|2020-03-14 11:12:45|0000111D|26         |
|2020-03-15 11:12:45|0000111D|26         |
|2020-03-16 11:28:45|0000111D|26         |
|2020-03-17 11:42:45|0000111D|26         |
|2020-03-18 11:32:45|0000111D|26         |
|2020-03-19 11:28:45|0000111D|26         |
|2020-03-27 11:28:45|0000111D|2A         |
|2020-04-20 11:12:45|0000111D|2A         |
|2020-04-27 11:15:45|0000111D|2A         |
|2020-04-28 12:17:45|0000111D|2A         |
|2020-04-29 12:17:45|0000111D|30         |
|2020-04-30 12:18:45|0000111D|30         |
|2020-04-25 12:19:45|0000111D|30         |
|2020-04-26 12:20:45|0000111D|29         |
|2020-04-27 12:27:45|0000111D|29         |
|2020-04-28 12:28:45|0000111D|29         |
|2020-04-29 12:29:45|0000111D|28         |
|2020-05-01 12:26:45|0000111D|28         |
|2020-05-02 12:26:45|0000111D|27         |
|2020-05-03 12:26:45|0000111D|27         |
|2020-05-03 12:27:45|0000111D|26         |
|2020-05-05 12:29:45|0000111D|26         |
|2020-05-07 12:30:45|0000111D|2A         |
|2020-05-08 12:33:45|0000111D|2A         |
|2020-05-09 12:26:45|0000111D|2A         |
|2020-05-12 12:26:45|0000111D|30         |
|2020-05-14 11:52:35|0000111D|29         |
|2020-05-16 11:52:35|0000111D|28         |
|2020-05-18 11:52:35|0000111D|27         |
|2020-05-20 11:52:35|0000111D|26         |
|2020-05-27 11:52:35|0000111D|2A         |
+-------------------+--------+-----------+

当结果的值在每个周期中变化时,我需要'DateTime'值.因此,基本上每个UID的周期是30到2A.现在在某些情况下可能会丢失数据,在这种情况下,必须填充数据丢失",例如对于一个周期(30-2A),如果没有记录"29",则在下面 1st_chnage 列应为"datamiss".对于每个唯一结果,除了每个循环的第一条记录外,我都必须记录最后一次发生的情况

I want the 'DateTime' value when values of result changes in each cycle. So basically 30 to 2A is a cycle for each UID. Now for some cases there can be data miss, in that case have to populate "datamiss",for example for a cycle(30-2A) if there is no record for '29' then in the below 1st_chnage column should be "datamiss". For each unique result I have to take the last occurrence EXCEPT the very first record for every cycle

基于此,我想要这样的输出:

Based on this I want a output like this:

|UID     |        start_point|         1st_change|         2nd_change|         3rd_change|         4th_change|         5th_change|
+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|0000111D|2020-02-29 11:42:34|2020-02-28 11:52:35|2020-03-05 11:12:45|2020-03-12 11:02:45|2020-03-19 11:28:45|2020-04-28 12:17:45|
|0000111D|2020-04-29 12:17:45|2020-04-28 12:28:45|2020-05-01 12:26:45|2020-05-03 12:26:45|2020-05-05 12:29:45|2020-05-09 12:26:45|
|0000111D|2020-05-12 12:26:45|2020-05-14 11:52:35|2020-05-16 11:52:35|2020-05-18 11:52:35|2020-05-20 11:52:35|2020-05-27 11:52:35|

考虑到我必须对每个传感器ID多次执行并且数据集有1000k条记录,我该如何以最有效的方式做到这一点.

How can I do that in most efficient way considering I have to do this multiple times for each sensor id and the data set is having 1000k records.

到目前为止,我能够做到这一点,但无法做到正确,无法处理数据丢失时的动态性

So far, I was able to do upto this but not able to get to the right point, unable to handle the dynamic ness when there is data miss

    w = Window.orderBy("DateTime")
    df_temp1=df.withColumn("rn",row_number().over(w)).\
    withColumn("lead",lead(col("result"),1).over(w)).\
    withColumn("lag",lag(col("result"),1).over(w)).withColumn("mismatch_bool",when((col('lead') != col('lag')),lit("true")).otherwise(lit("False")))

基于此,我想要这样的输出:

Based on this I want a output like this:

sensorid  start_point         1st_change          2nd_change           3rd chnage          4th_change           5th chnage
0000126D  2020-02-23 11:42:34 2020-02-24 11:49:34 2020-02-25 11:52:34  2020-02-26 11:34:35 2020-02-28 11:43:35  null
0000126D  2020-03-01 11:23:35 2020-03-04 11:31:35 2020-03-06 11:17:35  2020-03-08 09:34:09 2020-03-10 11:34:09  2020-03-08 07:34:09

考虑到我必须对每个传感器ID多次执行并且数据集有1000k条记录,我该如何以最有效的方式做到这一点.

How can I do that in most efficient way considering I have to do this multiple times for each sensor id and the data set is having 1000k records.

到目前为止,我能够做到这一点.

So far, I was able to do upto this.

    w = Window.orderBy("DateTime")
    df_temp1=df_records_indiv_sensor.withColumn("rn",row_number().over(w)).\
    withColumn("lead",lead(col("result"),1).over(w)).\
    withColumn("lag",lag(col("result"),1).over(w)).withColumn("mismatch_bool",when((col('lead') != col('lag')),lit("true")).otherwise(lit("False")))

推荐答案

Spark2.4 only.

Spark2.4 only.

不知道这是否是您想要的东西,但是我还是写了它,所以想把它贴出来.这里有两个真正的挑战. First 用于在30-2A的数据中创建创建分区,并能够在这些分区中找到所需的更改. Second ,用于处理丢失的行,以便仅将其发送到包含丢失行的间隔中.(使用 sequence 等解决).

Not sure if this is something you would want, but I wrote it anyways so thought id post it. There are 2 real challenges here. First is to get create partitions in data that go from 30-2A and be able to find desired changes in those partitions. Second, is to handle the missing row such that it is only sent to the interval with the missing row.(solved using sequence etc).

这整个代码可能不完全是您想要的(我可能有些不高兴),但是您可以 take parts of it and try them 它们可以帮助您实现我们的目标目标..

如果这正是您想要的,我将进一步详细解释代码.但是您应该能够遵循其中的大部分内容.

In the case that this is exactly what you want, Ill explain the code in further detail. But you should be able to follow most of it.

df.show()#your sample dataframe
+-------------------+--------+------+
|           DateTime|     UID|result|
+-------------------+--------+------+
|2020-02-23 11:42:34|0000111D|    30|
|2020-02-24 11:47:34|0000111D|    30|
|2020-02-24 11:48:34|0000111D|    29|
|2020-02-24 11:49:34|0000111D|    29|
|2020-02-24 11:50:34|0000111D|    28|
+-------------------+--------+------+
#only showing top 5 rows

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("result").orderBy("DateTime")
w1=Window().partitionBy("UID").orderBy("DateTime")
w2=Window().partitionBy("UID","inc_sum").orderBy("DateTime")
w3=Window().partitionBy("UID","inc_sum")
w4=Window().partitionBy("DateTime","UID","inc_sum").orderBy("DateTime")
df.withColumn("cor",F.row_number().over(w))\
  .withColumn("yo", F.when((F.col("cor")%2!=0) & (F.col("result")==30),F.lit(1)).otherwise(F.lit(0)))\
  .withColumn("inc_sum", F.sum("yo").over(w1))\
  .withColumn("cor", F.when((F.col("result")!=30) & (F.col("cor")%2==0), F.lit('change')).otherwise(F.lit('no')))\
        .withColumn("row_num", F.row_number().over(w2))\
        .withColumn("first", F.min("row_num").over(w3))\
        .withColumn("max", F.max("row_num").over(w3)).drop("yo","row_num","first","max")\
        .filter("row_num=first or row_num=max or cor='change'")\
        .withColumn("all1", F.collect_list("result").over(w3))\
        .withColumn("all", F.array(*[F.lit(x) for x in ['30','29','28','27','26','2A']]))\
        .withColumn("except", F.array_except("all","all1")[0])\
        .withColumn("result", F.when(F.col("except")+1==F.col("result"), F.expr("""sequence(int(except)+1,int(except),-1)"""))\
                    .otherwise(F.expr("""sequence(int(result),int(result),0)""")))\
        .withColumn("result", F.when(F.col("result").isNull(), F.array(F.lit(2))).otherwise(F.col("result")))\
        .select("DateTime","UID",F.explode("result").alias("result"),"inc_sum")\
        .withColumn("rownum2", F.row_number().over(w4))\
        .withColumn("DateTime", F.when((F.col("rownum2")>1), F.lit(0))\
                    .otherwise(F.col("DateTime"))).orderBy("DateTime")\
        .groupBy("UID").pivot("result").agg((F.collect_list("DateTime")))\
        .withColumn("zip", F.explode(F.arrays_zip(*['30','29','28','27','26','2'])))\
        .select("UID", "zip.*")\
        .select("UID", F.col("30").alias("start_point"),F.col("29").alias("1st_change"),F.col("28").alias("2nd_change")\
                ,F.col("27").alias("3rd_change"),F.col("26").alias("4th_change"),F.col("2").alias("5th_change"))\
                .replace('0',"datamiss").show()

+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|     UID|        start_point|         1st_change|         2nd_change|         3rd_change|         4th_change|         5th_change|
+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|0000111D|2020-02-23 11:42:34|2020-02-24 11:49:34|2020-02-25 11:52:34|2020-02-26 11:34:35|           datamiss|2020-02-28 11:43:35|
|0000111D|2020-03-01 11:23:35|2020-03-04 11:31:35|2020-03-06 11:17:35|2020-03-08 11:34:09|2020-03-10 04:12:45|2020-03-12 07:34:09|
+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+

UPDATED SOLUTION :

UPDATED SOLUTION:

基于提供的新数据.该代码能够处理启动周期并非总是以24开始的情况,并使用arrays_zip逻辑而不是顺序来处理数据丢失的情况.

Based on new data provided. This code was able to handle cases where start cycle does not always start with 24, and handled data miss using arrays_zip logic instead of sequence.

df.show()#new sample dataframe
+-------------------+---------+--------+-----------+-------+-----------+
|           DateTime|Identity |UID      Code       |len    |result|
+-------------------+---------+--------+-----------+-------+-----------+
|2020-02-25 11:50:34|       38|0000796D|         35|      2|         23|
|2020-02-25 11:50:35|       38|0000796D|         35|      2|         23|
|2020-02-26 11:52:35|       38|0000796D|         35|      2|         23|
|2020-02-27 11:52:35|       38|0000796D|         35|      2|         23|
|2020-02-28 11:52:35|       38|0000796D|         35|      2|         23|
+-------------------+---------+--------+-----------+-------+-----------+
#only showing top 5 rows

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when

w=Window().partitionBy("UID").orderBy("DateTime")
w5=Window().partitionBy("UID","result","inc_sum").orderBy("DateTime")
w6=Window().partitionBy("UID","result","inc_sum")
w2=Window().partitionBy("UId","inc_sum").orderBy("DateTime")
w3=Window().partitionBy("UId","inc_sum")
w4=Window().partitionBy("DateTime","UId","inc_sum").orderBy("DateTime")
df.withColumn("lag", F.lag("result").over(w))\
.withColumn("lag", F.when(F.col("lag").isNull(),F.lit(-1)).otherwise(F.col("lag")))\
.withColumn("inc_sum", F.when((F.col("result")=='24')\
& (F.col("lag")!='24'),F.lit(1)).when((F.col("result")=='23')\
& (F.col("lag")!='24')&(F.col("lag")!='23'),F.lit(1)).otherwise(F.lit(0)))\
.withColumn("inc_sum", F.sum("inc_sum").over(w))\
.withColumn("row_num", F.row_number().over(w2))\
.withColumn("first", F.min("row_num").over(w3))\
.withColumn("max", F.max("row_num").over(w3))\
.withColumn("cor", F.row_number().over(w5))\
.withColumn("maxcor", F.max("cor").over(w6))\
.withColumn("maxcor", F.when((F.col("result")=='24') | (F.col("result")=='1F'), F.lit(None)).otherwise(F.col("maxcor"))).filter('row_num=first or row_num=max or cor=maxcor')\
.select("DateTime", "UID","result","inc_sum")\
.withColumn("result", F.when(F.col("result")=='1F', F.lit(19)).otherwise(F.col("result")))\
.withColumn("all1", F.collect_list("result").over(w3))\
.withColumn("all", F.array(*[F.lit(x) for x in ['24','23','22','21','20','19']]))\
.withColumn("except", F.when(F.size("all1")!=F.size("all"),F.array_except("all","all1")).otherwise(F.array(F.lit(None))))\
.withColumn("except2", F.flatten(F.array("all1","except")))\
.withColumn("except2", F.expr("""filter(except2,x-> x!='null')""")).drop("all1","all","except")\
.groupBy("UID","inc_sum").agg(F.collect_list("DateTime").alias("DateTime"),F.collect_list("result").alias("result")\
                       ,F.first("except2").alias("except2"))\
.withColumn("zip", F.explode(F.arrays_zip("DateTime","result","except2")))\
.select("SensorId","zip.*","inc_sum")\
.withColumn("result", F.when(F.col("result").isNull(), F.col("except2")).otherwise(F.col("result")))\
.withColumn("DateTime", F.when(F.col("DateTime").isNull(), F.lit(0)).otherwise(F.col("DateTime")))\
.groupBy("UID").pivot("result").agg((F.collect_list("DateTime")))\
.withColumn("zipped", F.explode(F.arrays_zip(*['24','23','22','21','20','19'])))\
.select("UID", "zipped.*")\
.select("SensorId", F.col("24").alias("start_point"),F.col("23").alias("1st_change"),F.col("22").alias("2nd_change")\
,F.col("21").alias("3rd_change"),F.col("20").alias("4th_change"),F.col("19").alias("5th_change"))\
.replace('0',"datamiss").dropna()\
.show()

+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|UID. |        start_point|         1st_change|         2nd_change|         3rd_change|         4th_change|         5th_change|
+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|0000796D|2020-02-23 11:42:34|2020-02-28 11:52:35|2020-03-05 11:12:45|2020-03-12 11:02:45|2020-03-19 11:22:45|2020-04-22 12:17:45|
|0000796D|2020-05-12 12:26:45|2020-05-14 11:52:35|2020-05-16 11:52:35|2020-05-16 11:52:35|2020-05-20 11:52:35|2020-05-21 11:52:35|
|0000796D|2020-04-23 12:17:45|2020-04-28 12:22:45|2020-05-01 12:26:45|2020-05-03 12:26:45|2020-05-05 12:29:45|2020-05-09 12:26:45|
+--------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+

这篇关于如何检测pyspark数据框列中的模式何时发生变化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆