Pyspark爆炸JSON字符串 [英] Pyspark explode json string

查看:113
本文介绍了Pyspark爆炸JSON字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Input_dataframe

id  name     collection
111 aaaaa    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
             }
222 bbbbb    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
              }

这里

id ==> string
name ==> string
collection ==> string (string representation of JSON_data)

我想要这样的东西

output_dataframe

id  name   key  value
111 aaaaa  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
111 aaaaa  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
111 aaaaa  "3"  {"city":"city_3","state":"state_3","country":"country_3"}             
222 bbbbb  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
222 bbbbb  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
222 bbbbb  "3"  {"city":"city_3","state":"state_3","country":"country_3"}

如果我的 collection 属性类型是 map array ,那么 explode 函数将完成我的任务.但是我有 collection 作为字符串类型(JSON_data)

if my collection attribute type is either map or array then explode function will do my task. But i have collection as a string type(JSON_data)

如何获取output_dataframe?

请让我知道

注意集合属性可能具有嵌套且不可预测的架构.

NOTE collection attribute may have nested and unpredictable schema.

{
  "1":{"city":"city_1","state":"state_1","country":"country_1"},          
  "2":{"city":"city_2","state":"state_2","country":"country_2","a":  
       {"aa":"111"}},
  "3":{"city":"city_3","state":"state_3"}
             }

推荐答案

这是一个很棘手的解决方案(因为它使用底层的 RDD ,所以并不理想),但是我已经在架构进行了测试是不一致的,而且似乎很健壮:

Here is a hacky solution (not ideal as it uses the underlying RDD) but I have tested it on the scenario where the schema is inconsistent and it seems to be robust:

from pyspark.sql import Row

rdd1 = df.rdd

rdd1.map(lambda x: [(key, val) if key != 'collection' else (key, eval(val))
               for key, val in x.asDict().items()])
    .map(lambda x: Row(**dict(x)))
    .toDF().show()

这篇关于Pyspark爆炸JSON字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆