将字典列表分解为 Spark 中的附加列 [英] Explode list of dictionaries into additional columns in Spark

查看:81
本文介绍了将字典列表分解为 Spark 中的附加列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前有一个 UDF,它接受一列 xml 字符串并将其解析为字典列表.然后,我想根据键值对将该字典列列表分解为其他列.

I currently have a UDF that takes a column of xml strings and parses it into lists of dictionaries. I then want to explode that list of dictionaries column out into additional columns based on the key-value pairs.

输入看起来像这样:

   id  type  length  parsed    
0  1   A     144     [{'key1':'value1'},{'key1':'value2', 'key2':'value3'},...]
1  1   B     20      [{'key1':'value4'},{'key2':'value5'},...]
2  4   A     54      [{'key3':'value6'},...]

我希望输出看起来像这样:

And I want the output to look like this:

   id  type  length  key1             key2     key3
0  1   A     144     [value1,value2]  value3
1  1   B     20      value4           value5
2  4   A     54                                value6

我已经能够像这样在 Pandas 中做到这一点:

I have been able to do this in Pandas like so:

s = data['parsed xml'].explode()
df_join = (pd.DataFrame(s.tolist(), index = s.index)
             .stack()
             .groupby(level=[0,1])
             .agg(list)
             .apply(lambda x: x[0] if len(x)==1 else x)
             .unstack(fill_value='')
          )
t = data.join(df_join, lsuffix = '_x', rsuffix = '_y')

问题是我在 Spark 中转换这个 Pandas 代码时遇到了问题(我没有 Pandas 可用),这会给我同样的结果.

The issue is I am having trouble converting this Pandas code in Spark (won't have Pandas available to me) that would give me the same result.

我可用的 Spark 是 1.6.0.

The Spark I will have available is 1.6.0.

推荐答案

您可以使用 explode 执行此操作两次 - 一次分解数组,一次分解数组的地图元素.此后,您可以将 pivotcollect_list 聚合一起使用.

You can do this using explode twice - once to explode the array and once to explode the map elements of the array. Thereafter, you can use pivot with a collect_list aggregation.

from pyspark.sql.functions import explode,collect_list
#explode array
df_1 = df.withColumn('exploded_arr',explode('parsed')) 
#explode maps of array elements
df_2 = df_1.select(*df_1.columns,explode('exploded_arr')) #the default column names returned after exploding a map are `key`,`value`. change them as needed
#pivot with aggregation
df_2.groupBy("id","length","type").pivot("key").agg(collect_list("value")).show()

这篇关于将字典列表分解为 Spark 中的附加列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆