将字典列表分解为 Spark 中的附加列 [英] Explode list of dictionaries into additional columns in Spark
问题描述
我目前有一个 UDF,它接受一列 xml 字符串并将其解析为字典列表.然后,我想根据键值对将该字典列列表分解为其他列.
I currently have a UDF that takes a column of xml strings and parses it into lists of dictionaries. I then want to explode that list of dictionaries column out into additional columns based on the key-value pairs.
输入看起来像这样:
id type length parsed
0 1 A 144 [{'key1':'value1'},{'key1':'value2', 'key2':'value3'},...]
1 1 B 20 [{'key1':'value4'},{'key2':'value5'},...]
2 4 A 54 [{'key3':'value6'},...]
我希望输出看起来像这样:
And I want the output to look like this:
id type length key1 key2 key3
0 1 A 144 [value1,value2] value3
1 1 B 20 value4 value5
2 4 A 54 value6
我已经能够像这样在 Pandas 中做到这一点:
I have been able to do this in Pandas like so:
s = data['parsed xml'].explode()
df_join = (pd.DataFrame(s.tolist(), index = s.index)
.stack()
.groupby(level=[0,1])
.agg(list)
.apply(lambda x: x[0] if len(x)==1 else x)
.unstack(fill_value='')
)
t = data.join(df_join, lsuffix = '_x', rsuffix = '_y')
问题是我在 Spark 中转换这个 Pandas 代码时遇到了问题(我没有 Pandas 可用),这会给我同样的结果.
The issue is I am having trouble converting this Pandas code in Spark (won't have Pandas available to me) that would give me the same result.
我可用的 Spark 是 1.6.0.
The Spark I will have available is 1.6.0.
推荐答案
您可以使用 explode
执行此操作两次 - 一次分解数组,一次分解数组的地图元素.此后,您可以将 pivot
与 collect_list
聚合一起使用.
You can do this using explode
twice - once to explode the array and once to explode the map elements of the array. Thereafter, you can use pivot
with a collect_list
aggregation.
from pyspark.sql.functions import explode,collect_list
#explode array
df_1 = df.withColumn('exploded_arr',explode('parsed'))
#explode maps of array elements
df_2 = df_1.select(*df_1.columns,explode('exploded_arr')) #the default column names returned after exploding a map are `key`,`value`. change them as needed
#pivot with aggregation
df_2.groupBy("id","length","type").pivot("key").agg(collect_list("value")).show()
这篇关于将字典列表分解为 Spark 中的附加列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!