PySpark groupby和最大值选择 [英] PySpark groupby and max value selection
问题描述
我有一个类似PySpark的数据框
I have a PySpark dataframe like
name city date
satya Mumbai 13/10/2016
satya Pune 02/11/2016
satya Mumbai 22/11/2016
satya Pune 29/11/2016
satya Delhi 30/11/2016
panda Delhi 29/11/2016
brata BBSR 28/11/2016
brata Goa 30/10/2016
brata Goa 30/10/2016
我需要为每个名称找出最喜欢的CITY,逻辑是如果在名称" +城市"对上具有最大城市出现次数的城市,则将城市作为fav_city.如果发现多个相同的事件,则考虑具有最新日期的城市.会解释:
I need to find-out most preferred CITY for each name and Logic is " take city as fav_city if city having max no. of city occurrence on aggregate 'name'+'city' pair". And if multiple same occurrence found then consider city with latest Date. WIll explain:
d = df.groupby('name','city').count()
#name city count
brata Goa 2 #clear favourite
brata BBSR 1
panda Delhi 1 #as single so clear favourite
satya Pune 2 ##Confusion
satya Mumbai 2 ##confusion
satya Delhi 1 ##shd be discard as other cities having higher count than this city
#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name count city
brata 2 Goa #fav found
panda 1 Delhi #fav found
satya 2 Mumbai #can't say
satya 2 Pune #can't say
对于用户"satya",我需要返回trx_history,并从最近交易的孟买"或浦那"获得具有等于_max计数I:e的城市的最新日期(最大日期),将该城市视为fav_city.在这种情况下,"Pune"为"29/11/2016"是最新/最长日期.
In case of user 'satya' I need to go back to trx_history and get latest date for cities having equal_max count I:e from 'Mumbai' or 'Pune' which is last transacted (max date), consider that city as fav_city. In this case 'Pune' as '29/11/2016' is latest/max date.
但是我无法进一步完成该工作.
But I am not able to proceed further how to get that done.
请提供逻辑帮助,或者如果有更好的解决方案(更快/更紧凑的方式),请提出建议.谢谢.
Please help me with logic or if any better solution(faster/compact way), please suggest. Thanks.
推荐答案
首先将日期转换为DateType
:
df_with_date = df.withColumn(
"date",
F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)
下一个groupBy
用户和城市,但扩展聚合如下:
Next groupBy
user and city but extend aggregation like this:
df_agg = (df_with_date
.groupBy("name", "city")
.agg(F.count("city").alias("count"), F.max("date").alias("max_date")))
定义一个窗口:
from pyspark.sql.window import Window
w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))
添加排名
df_with_rank = (df_agg
.withColumn("rank", F.dense_rank().over(w)))
然后过滤:
result = df_with_rank.where(F.col("rank") == 1)
您可以使用以下代码检测剩余的重复项:
You can detect remaining duplicates using code like this:
import sys
final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
这篇关于PySpark groupby和最大值选择的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!