PySpark:从“字符串类型"列中读取嵌套的JSON并创建列 [英] PySpark: Read nested JSON from a String Type Column and create columns

查看:109
本文介绍了PySpark:从“字符串类型"列中读取嵌套的JSON并创建列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在PySpark中有一个数据框,其中包含3列-json,日期和object_id:

I have a dataframe in PySpark with 3 columns - json, date and object_id:

-----------------------------------------------------------------------------------------
|json                                                              |date      |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123   |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123   |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}}            |2020-08-03|xyz123   |
-----------------------------------------------------------------------------------------

现在,我有一个变量列表:[a.c.60,a.n.60,a.d,g.h].我只需要从上述数据框的json列中提取这些变量,然后将这些变量作为数据列中的列及其各自的值添加即可.

Now I have a list of variables: [a.c.60, a.n.60, a.d, g.h]. I need to extract only these variables from the json column of above mentioned dataframe and to add those variables as columns in the dataframe with their respective values.

最后,数据框应如下所示:

So in the end, the dataframe should look like:

-------------------------------------------------------------------------------------------------------
|json                                                    |date      |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123   |0     |null  |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123   |null  |0     |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}}     |2020-08-03|xyz123   |null  |null  |0.02|0   |
-------------------------------------------------------------------------------------------------------

请帮助获取此结果数据框.我面临的主要问题是由于传入json数据没有固定的结构.json数据可以是嵌套形式的任何数据,但我只需要提取给定的四个变量.我在Pandas中通过展平json字符串然后提取4个变量来实现这一点,但是在Spark中却变得越来越困难.

Please help to get this result dataframe. The main problem I am facing is due to no fixed structure for the incoming json data. The json data can be anything in nested form but I need to extract only the given four variables. I have achieved this in Pandas by flattening out the json string and then to extract the 4 variables but in Spark it is getting difficult.

推荐答案

有2种方法可以做到:

  1. 使用

    import pyspark.sql.functions as F
    
    df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                                '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                                '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                               StringType())
    
    df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
                    F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
                    F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
                    F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))
    

    将给出:

    >>> df3.show()
    +------+------+----+----+
    |a_c_60|a_n_60| a_d| g_h|
    +------+------+----+----+
    |     0|  null|0.01|null|
    |  null|     0|0.01|null|
    |  null|  null|null|   0|
    +------+------+----+----+
    

    1. 显式声明架构(仅必要字段),使用来自pyspark.sql.types import的

      from pyspark.sql.types import *
      import pyspark.sql.functions as F
      
      aSchema = StructType([
          StructField("c", StructType([
              StructField("60", DoubleType(), True)
          ]), True),
          StructField("n", StructType([
              StructField("60", DoubleType(), True)
          ]), True),
          StructField("d", DoubleType(), True),
      ])
      gSchema = StructType([
          StructField("h", DoubleType(), True)
      ])
      
      schema = StructType([
          StructField("a", aSchema, True),
          StructField("g", gSchema, True)
      ])
      
      df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                                  '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                                  '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                                 StringType())
      
      df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
      df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()
      

      会给予

      +------+------+----+----+
      |a.c.60|a.n.60| a.d| g.h|
      +------+------+----+----+
      |   0.0|  null|0.01|null|
      |  null|   0.0|0.01|null|
      |  null|  null|null| 0.0|
      +------+------+----+----+
      

      这篇关于PySpark:从“字符串类型"列中读取嵌套的JSON并创建列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆