PySpark:分组数据聚合中的自定义功能 [英] PySpark: custom function in aggregation on grouped data

查看:446
本文介绍了PySpark:分组数据聚合中的自定义功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含以下内容的PySpark DataFrame

I have a PySpark DataFrame containing things as

Row(id='id1', type='A', status='H', keywords=['k1', 'k2', 'k3'])

状态是一个二进制选项( S / H)。
我需要做的是计算每种类型,ID和状态的每个关键字在状态 S 中出现的比率。
比率将为

Status is a binary option ('S'/'H'). what I need to do is counting the ratio of occurrences in status S per each keyword per type, id and status. Ratio will be

s/(s+h)

其中 s h 是这里的出现。
例如,如果在 A 类型中,关键字 k1 的出现次数是S的2倍和H的3倍,那么在这种情况下,我希望它的2/3而我的最终输出理想情况下将是

where s and h here are the occurrences. So for instance, if keyword k1 occurs 2 times as S and 3 times as H in type A I'll want 2/3 for it in that type and my final output would ideally be

Row(id='id1', type='A', keyword='k1', ratio=0.66)

我当时认为这必须经过几个步骤,

I was thinking this has to pass through several steps, and I'd be happy with computing the occurrences in S and H and then creating further column to ratio the two.

但我很乐意计算S和H中的出现次数,然后创建进一步的列以对这两者进行比例运算。但是在运行 groupBy后如何计算所说的出现次数是 id, type和 status吗?

But how would I compute the said occurrences after I run a groupBy by 'id', 'type' and 'status'? Would there be a way to run an agg with a custom function?

推荐答案

应该使用自定义函数来运行 agg 吗?做到这一点:

Something like this should do the trick:

from pyspark.sql.functions import explode, avg, col

ratio = avg(
    # If status "S" then 1.0 else 0.0
    (col("status") == "S").cast("double")
 ).alias("ratio")

(df
    .withColumn("keyword", explode("keywords"))
    .groupBy("id", "type", "keyword")
    .agg(ratio))

这篇关于PySpark:分组数据聚合中的自定义功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆