如何根据pyspark中的索引查找数组列的平均值 [英] How to find average of a array column based on index in pyspark

查看:33
本文介绍了如何根据pyspark中的索引查找数组列的平均值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有如下数据

-----------------------------
place  | key        | weights
----------------------------
amazon | lion       | [ 34, 23, 56 ]
north  | bear       | [ 90, 45]
amazon | lion       | [ 38, 30, 50 ]
amazon | bear       | [ 45 ]
amazon | bear       | [ 40 ]

我试图得到如下结果

-----------------------------
place  | key        | average
----------------------------
amazon | lion1      | 36.0      #(34 + 38)/2
amazon | lion2      | 26.5      #(23 + 30)/2
amazon | lion3      | 53.0      #(50 + 56)/2
north  | bear1      | 90        #(90)/1
north  | bear2      | 45        #(45)/1
amazon | bear1      | 42.5      #(45 + 40)/2

我明白,首先我必须对 placekey 列进行分组,然后我必须根据索引对数组元素取平均值.例如,lion1 是数组 [ 34, 23, 56 ][ 38, 30, 50 ] 中的第一个索引元素.

I get the point that first I have to do a groupby on columns place and key, and then I have to take average on array elements based on indexes. For example lion1 is 1st index element in arrays [ 34, 23, 56 ] and [ 38, 30, 50 ].

我已经有了一个使用 posexplode 的解决方案,但问题是在实际数据中 weights 数组列的大小非常大,正如 posexplode 所补充的更多的行,数据规模从1000万行急剧增加到12亿行,无法在现有集群上可靠地计算.

I already has a solution using posexplode, but the problem is in real data weights array column size is very high, as posexplode adds more rows, data size has increased enormously from 10 million rows to 1.2 billion and unable to compute in a reliable time on present cluster.

我认为最好添加比行更多的列,然后对列进行反透视,但我不知道如何使用 pyspark 或 spark SQL 2.2.1 实现这一点.

I think it is better to add more columns than rows and then unpivot the columns, but I have no idea how to achieve that using pyspark or spark SQL 2.2.1 .

推荐答案

您可以通过 functions.size() 然后展开该列:

You can find max number of elements in an array column by functions.size() and then expand that column:

  1. 设置数据

  1. setup the data

from pyspark.sql import functions as F

df = spark.createDataFrame([    
      ('amazon', 'lion', [ 34, 23, 56 ])
    , ('north',  'bear', [ 90, 45])
    , ('amazon', 'lion', [ 38, 30, 50 ])
    , ('amazon', 'bear', [ 45 ])    
    , ('amazon', 'bear', [ 40 ])
], ['place', 'key', 'average'])

  • 找出数组字段平均值"中元素的最大数量

  • Find the max number of elements in the array field 'average'

    n = df.select(F.max(F.size('average')).alias('n')).first().n
    
    >>> n
    3
    

  • 将数组列转换为 n 列

  • Convert array column into n-columns

    df1 = df.select('place', 'key', *[F.col('average')[i].alias('val_{}'.format(i+1)) for i in range(n)])
    
    >>> df1.show()
    +------+----+-----+-----+-----+
    | place| key|val_1|val_2|val_3|
    +------+----+-----+-----+-----+
    |amazon|lion|   34|   23|   56|
    | north|bear|   90|   45| null|
    |amazon|lion|   38|   30|   50|
    |amazon|bear|   45| null| null|
    |amazon|bear|   40| null| null|
    +------+----+-----+-----+-----+
    

  • 计算新列的平均聚合

  • Calculate the mean aggregation on the new columns

    df2 = df1.groupby('place', 'key').agg(*[ F.mean('val_{}'.format(i+1)).alias('average_{}'.format(i+1)) for i in range(n)])
    
    >>> df2.show()
    +------+----+---------+---------+---------+
    | place| key|average_1|average_2|average_3|
    +------+----+---------+---------+---------+
    |amazon|bear|     42.5|     null|     null|
    | north|bear|     90.0|     45.0|     null|
    |amazon|lion|     36.0|     26.5|     53.0|
    +------+----+---------+---------+---------+
    

  • 使用 select + union + reduce 反转列

  • Unpivot the columns using select + union + reduce

    from functools import reduce
    
    df_new = reduce(lambda x,y: x.union(y), [
        df2.select('place', F.concat('key', F.lit(i+1)).alias('key'), F.col('average_{}'.format(i+1)).alias('average')) \
           .dropna(subset=['average']) for i in range(n)
    ])
    
    >>> df_new.show()
    +------+-----+-------+
    | place|  key|average|
    +------+-----+-------+
    |amazon|bear1|   42.5|
    | north|bear1|   90.0|
    |amazon|lion1|   36.0|
    | north|bear2|   45.0|
    |amazon|lion2|   26.5|
    |amazon|lion3|   53.0|
    +------+-----+-------+
    

  • 这篇关于如何根据pyspark中的索引查找数组列的平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆