Pyspark爆炸JSON字符串 [英] Pyspark explode json string
本文介绍了Pyspark爆炸JSON字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
Input_dataframe
id name collection
111 aaaaa {"1":{"city":"city_1","state":"state_1","country":"country_1"},
"2":{"city":"city_2","state":"state_2","country":"country_2"},
"3":{"city":"city_3","state":"state_3","country":"country_3"}
}
222 bbbbb {"1":{"city":"city_1","state":"state_1","country":"country_1"},
"2":{"city":"city_2","state":"state_2","country":"country_2"},
"3":{"city":"city_3","state":"state_3","country":"country_3"}
}
这里
id ==> string
name ==> string
collection ==> string (string representation of JSON_data)
我想要这样的东西
output_dataframe
id name key value
111 aaaaa "1" {"city":"city_1","state":"state_1","country":"country_1"},
111 aaaaa "2" {"city":"city_2","state":"state_2","country":"country_2"},
111 aaaaa "3" {"city":"city_3","state":"state_3","country":"country_3"}
222 bbbbb "1" {"city":"city_1","state":"state_1","country":"country_1"},
222 bbbbb "2" {"city":"city_2","state":"state_2","country":"country_2"},
222 bbbbb "3" {"city":"city_3","state":"state_3","country":"country_3"}
如果我的 collection
属性类型是 map
或 array
,那么 explode
函数将完成我的任务.但是我有 collection
作为字符串类型(JSON_data)
if my collection
attribute type is either map
or array
then explode
function will do my task. But i have collection
as a string type(JSON_data)
如何获取output_dataframe?
请让我知道
注意集合属性可能具有嵌套且不可预测的架构.
NOTE collection attribute may have nested and unpredictable schema.
{
"1":{"city":"city_1","state":"state_1","country":"country_1"},
"2":{"city":"city_2","state":"state_2","country":"country_2","a":
{"aa":"111"}},
"3":{"city":"city_3","state":"state_3"}
}
推荐答案
这是一个很棘手的解决方案(因为它使用底层的 RDD
,所以并不理想),但是我已经在架构进行了测试是不一致的,而且似乎很健壮:
Here is a hacky solution (not ideal as it uses the underlying RDD
) but I have tested it on the scenario where the schema is inconsistent and it seems to be robust:
from pyspark.sql import Row
rdd1 = df.rdd
rdd1.map(lambda x: [(key, val) if key != 'collection' else (key, eval(val))
for key, val in x.asDict().items()])
.map(lambda x: Row(**dict(x)))
.toDF().show()
这篇关于Pyspark爆炸JSON字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文