Pyspark:如何编写复杂的数据框计算代码 [英] Pyspark :How to code complicated Dataframe calculation

查看:68
本文介绍了Pyspark:如何编写复杂的数据框计算代码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

数据框已经按日期排序了,

The dataframe is already sorted out by date,

col1 == 1值是唯一的,

col1 ==1 value is unique,

并通过col1 == 1,它将使增量增加1(例如1,2,3,4,5,6,7 ...) 只有-1是重复项.

and col1==1 is passed, it will increase increment by 1 (eg. 1,2,3,4,5,6,7...) and only the -1 are duplicates.

我有一个数据框,看起来像是df

I have a dataframe looks like this call it df

TEST_schema = StructType([StructField("date", StringType(), True),\
                          StructField("col1", IntegerType(), True),\
                          StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',-1,-1),('2020-08-02',-1,-1),('2020-08-03',-1,3),('2020-08-04',-1,2),('2020-08-05',1,4),\
             ('2020-08-06',2,1),('2020-08-07',3,2),('2020-08-08',4,3),('2020-08-09',5,-1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df.show()



+--------+----+----+
    date |col1|col2|
+--------+----+----+
2020-08-01| -1|  -1|
2020-08-02| -1|  -1|
2020-08-03| -1|   3|
2020-08-04| -1|   2|
2020-08-05| 1 |   4|
2020-08-06| 2 |   1|
2020-08-07| 3 |   2|
2020-08-08| 4 |   3|
2020-08-09| 5 |  -1|
+--------+----+----+

条件是当col1 == 1时,然后我们从col2 == 4开始向后加(例如4,5,6,7,8,...),而col2 == 4之后则全部返回0方式(例如4,0,0,0,0 ...)

The condition is when col1 == 1, then we start adding backwards from col2 ==4, (eg. 4,5,6,7,8,...) and the after col2 == 4 return 0 all the way (eg. 4,0,0,0,0...)

所以,我得到的df看起来像这样.

So, my resulted df will look something like this.

   +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  8 |
    2020-08-02| -1|  -1|  7 |
    2020-08-03| -1|   3|  6 |
    2020-08-04| -1|   2|  5 |
    2020-08-05| 1 |   4|  4 |
    2020-08-06| 2 |   1|  0 |
    2020-08-07| 3 |   2|  0 |
    2020-08-08| 4 |   3|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

增强:我想添加其他条件,其中col2 == -1 col1 == 1(在2020-08-05时),而col2 == -1变为连续..然后我要计算连续-1,然后在连续中断处添加col2 ==?价值.因此,这里有一个示例需要清除.

Enhancement: I want to add additional condition where col2 == -1 when col1 == 1 (at 2020-08-05), and col2 == -1 goes consecutive.. then I want to count consecutive -1, and then add where the consecutive breaks col2 == ? value. so here's an example to clear.

    +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  11|
    2020-08-02| -1|  -1|  10|
    2020-08-03| -1|   3|  9 |
    2020-08-04| -1|   2|  8 |
    2020-08-05| 1 |  -1|  7*|
    2020-08-06| 2 |  -1|  0 |
    2020-08-07| 3 |  -1|  0 |
    2020-08-08| 4 |  4*|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

因此,我们看到3个连续的-1s(从2020-08-05开始,我们只关心第一个连续的-1s),而在连续的之后我们有4个(在2020-08-08处标记为*),然后我们将在col1 == 1行有4+ 3 = 7.有可能吗?

so, we see 3 consecutive -1s, (starting from 2020-08-05, we only care about first consecutive -1s) and after the consecutive we have 4 (at 2020-08-08 denoted as *), then we would have 4+ 3 =7 at the col1 ==1 row. is it possible?

**我的第一次尝试**

** MY 1ST ATTEMPT **

TEST_df = TEST_df.withColumn('cumsum', sum(when( col('col1') < 1, col('col1') ) \
                 .otherwise( when( col('col1') == 1, 1).otherwise(0))).over(Window.partitionBy('col1').orderBy().rowsBetween(-sys.maxsize, 0)))
TEST_df.show()

+----------+----+----+------+
|      date|col1|col2|cumsum|
+----------+----+----+------+
|2020-08-01|  -1|  -1|    -1|
|2020-08-02|  -1|  -1|    -2|
|2020-08-03|  -1|   3|    -3|
|2020-08-04|  -1|   2|    -4|
|2020-08-05|   1|   4|     1|
|2020-08-07|   3|   2|     0|
|2020-08-09|   5|  -1|     0|
|2020-08-08|   4|   3|     0|
|2020-08-06|   2|   1|     0|
+----------+----+----+------+

w1 = Window.orderBy(desc('date'))
w2 =Window.partitionBy('case').orderBy(desc('cumsum'))

TEST_df.withColumn('case', sum(when( (col('cumsum') == 1) & (col('col2') != -1) , col('col2')) \
       .otherwise(0)).over(w1)) \
  .withColumn('rank', when(col('case') != 0, rank().over(w2)-1).otherwise(0)) \
  .withColumn('want', col('case') + col('rank')) \
  .orderBy('date') \
+----------+----+----+------+----+----+----+
|date      |col1|col2|cumsum|case|rank|want|
+----------+----+----+------+----+----+----+
|2020-08-01|-1  |-1  |-1    |4   |1   |5   |
|2020-08-02|-1  |-1  |-2    |4   |2   |6   |
|2020-08-03|-1  |3   |-3    |4   |3   |7   |
|2020-08-04|-1  |2   |-4    |4   |4   |8   |
|2020-08-05|1   |4   |1     |4   |0   |4   |
|2020-08-06|2   |1   |0     |0   |0   |0   |
|2020-08-07|3   |2   |0     |0   |0   |0   |
|2020-08-08|4   |3   |0     |0   |0   |0   |
|2020-08-09|5   |-1  |0     |0   |0   |0   |
+----------+----+----+------+----+----+----+

您会看到1,2,3,4级,如果我可以将其设为4,3,2,1,则它看起来像我的结果数据框....如何反转它?我尝试了orderby asc和desc ... 当然是在增强

You see that rank 1,2,3,4 if I can make it 4,3,2,1 it will look like my resulted dataframe.... how to reverse it? i tried both orderby asc, and desc... and of course this is before the enhancement

推荐答案

IIUC,您可以尝试以下操作:

IIUC, you can try the following:

  1. groupby并创建所有相关行的collect_list(下面的代码中的vals),按日期从高到低的顺序对列表进行排序(注意:,将groupby(lit(1))更改为您所需要的任何列)可以用来将您的数据分为独立的子集.

  1. groupby and create a collect_list of all related rows(vals in below code), sort the list by date in desencending order (Note: change groupby(lit(1)) to whatever columns you can use to divide your data into independent subset.

查找具有col1 == 1

如果在idx处为col2==-1,则找到从idx到列表开头的偏移量,第一行在当前代码中具有col2 != -1(注意:,如果idx之前的所有col2均为-1,则offset可能为NULL,您必须确定所需的内容,例如,使用coalesce(IF(...),0))

if col2==-1 at idx, then find the offset from idx to the beginning of the list with the first row having col2 != -1 (Note: in the current code, offset might be NULL if all col2 before idx are -1, you will have to decide what you want. for example use coalesce(IF(...),0))

具有偏移量和idx之后,可以通过以下方式计算want列:

after we have offset and idx, the want column can be calculated by:

IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)

  • 使用SparkSQL函数内联爆炸结构数组.

  • use SparkSQL function inline to explode the array of structs.

    注意:如果生产数据框中的列过多,则可以使用Window函数应用相同的逻辑.

    Note: The same logic can be applied using Window function in case too many columns exist in your production dataframe.

    以下代码:

    from pyspark.sql.functions import sort_array, collect_list, struct, expr, lit
    
    TEST_df = spark.createDataFrame([
      ('2020-08-01', -1, -1), ('2020-08-02', -1, -1), ('2020-08-03', -1, 3),
      ('2020-08-04', -1, 2), ('2020-08-05', 1, -1), ('2020-08-06', 2, -1),
      ('2020-08-07', 3, -1), ('2020-08-08', 4, 4), ('2020-08-09', 5, -1)
    ], ['date', 'col1', 'col2'])
    
    # list of column used in calculation
    cols = ["date", "col1", "col2"]
    
    df_new = TEST_df \
        .groupby(lit(1)) \
        .agg(sort_array(collect_list(struct(*cols)),False).alias('vals')) \
        .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
        .withColumn('offset', expr("""
            coalesce(IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0),0)
         """)).selectExpr("""
           inline(
             transform(vals, (x,i) -> named_struct(
                 'date', x.date,
                 'col1', x.col1,
                 'col2', x.col2,
                 'want', IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
               )
             )
        )""")
    

    输出:

    df_new.orderBy('date').show()
    +----------+----+----+----+
    |      date|col1|col2|want|
    +----------+----+----+----+
    |2020-08-01|  -1|  -1|  11|
    |2020-08-02|  -1|  -1|  10|
    |2020-08-03|  -1|   3|   9|
    |2020-08-04|  -1|   2|   8|
    |2020-08-05|   1|  -1|   7|
    |2020-08-06|   2|  -1|   0|
    |2020-08-07|   3|  -1|   0|
    |2020-08-08|   4|   4|   0|
    |2020-08-09|   5|  -1|   0|
    +----------+----+----+----+
    

    对于每个注释,添加了一种替代方法,可以使用Window聚合函数代替groupby:

    Per comments, added an alternative to use Window aggregate function instead of groupby:

    from pyspark.sql import Window
    
    # WindowSpec to cover all related Rows in the same partition
    w1 = Window.partitionBy().orderBy('date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
    
    cols = ["date", "col1", "col2"]
    
    # below `cur_idx` is the index for the current Row in array `vals`
    df_new = TEST_df.withColumn('vals', sort_array(collect_list(struct(*cols)).over(w1),False)) \
        .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
        .withColumn('offset', expr("IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0)")) \
        .withColumn("cur_idx", expr("array_position(vals, struct(date,col1,col2))-1")) \
        .selectExpr(*TEST_df.columns, "IF(cur_idx<idx, 0, vals[idx-offset].col2 + offset + cur_idx - idx) as want")
    

    这篇关于Pyspark:如何编写复杂的数据框计算代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆