带有 json 列的 pyspark 数据框将 json 元素聚合到一个新列中并删除重复项 [英] pyspark dataframe with json column to aggregate the json elements into a new column and remove duplicated
问题描述
我正在尝试在数据块上读取带有 json 列的 pyspark 数据框.
I am trying to read a pyspark dataframe with json column on databricks.
数据框:
year month json_col
2010 09 [{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]
2010 09 [{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}]
2007 10 [{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}]
2007 10 [{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}]
我需要一个包含所有重复的p_id"的新数据框;被移除并按年和月聚合
I need a new dataframe with all duplicated "p_id" are removed and aggregated by year and month
year month p_id (string)
2010 09 ["vfdvtbe", "cdscs", "usdvwq", "ujhbe", "yjev"]
2007 10 ["ukerge", "ikrtw", "ikwca", "unvwq", "cqwcq"]
新列p_id"是一串数组.我想计算每年和每月有哪些不同的p_id"以及它们中有多少.并且,还删除出现在同一年和同一月的重复元素.
the new column "p_id" is a string of array. I would like to count what distinct "p_id"s are and how many of them in each year and month. And, also remove the duplicated elements that appear in the same year and month.
我的代码:
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(StructType(
[
StructField('p_id', StringType(), True)
]
))
schema = ArrayType(MapType(StringType(),StringType()))
t = ff.withColumn("data",F.explode(F.from_json(F.col("json_col"),schema))).withColumn("data",F.when(F.col("data")["product_id"].cast("string").isNotNull(),F.col("data")["product_id"])).filter(F.col("data").isNotNull()).drop("json_col")
display(t)
我不确定这可以删除重复项吗?
I am not sure this can remove duplicates ?
谢谢
推荐答案
使用 flatten, array_distinct
和 groupBy, collect_list
函数用于这种情况.
Use flatten, array_distinct
with groupBy, collect_list
functions for this case.
示例:
df.show(10,False)
#+----+-----+---------------------------------------------------------+
#|year|month|json_col |
#+----+-----+---------------------------------------------------------+
#|2010|09 |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]|
#|2010|09 |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}] |
#|2007|10 |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}] |
#|2007|10 |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}] |
#+----+-----+---------------------------------------------------------+
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(StructType(
[
StructField('p_id', StringType(), True)
]
))
df1=df.withColumn("ff",from_json(col("json_col"),schema)).\
select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp"))
df1.groupBy("year","month").\
agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).\
show(10,False)
#+----+-----+-------------------------------------------+
#|year|month|p_id |
#+----+-----+-------------------------------------------+
#|2010|09 |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]|
#|2007|10 |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] |
#+----+-----+-------------------------------------------+
这篇关于带有 json 列的 pyspark 数据框将 json 元素聚合到一个新列中并删除重复项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!