将列转换为 JSON/dict 并在 pyspark 中的列中展平 JSON 值 [英] Casting a column to JSON/dict and flattening JSON values in a column in pyspark

查看:33
本文介绍了将列转换为 JSON/dict 并在 pyspark 中的列中展平 JSON 值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Pyspark 的新手,我正在研究如何将列类型转换为 dict 类型,然后使用 explode 将该列展平为多列.

I am new to Pyspark and I am figuring out how to cast a column type to dict type and then flatten that column to multiple columns using explode.

这是我的数据框的样子:

Here's how my dataframe looks like:

   col1    | col2        |
    -----------------------
    test:1  | {"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}],
{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]
    test:2  | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}

现在,这个数据框的架构是

Right now, the schema of this dataframe is

root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)

我想要的输出是这样的:

The output I am looking to have is like this:

col1   | col2           | Id | cName | pScore  |
------------------------------------------------
test:1 | test1          | 17 | c1    | null    | 
test:1 | test1          | 01 | c2    | 0.003609|
test:1 | test8          | 1  | c11   | 0.0     |
test:1 | test8          | 012| c2    | 0.003609|
test:2 | test1:subtest2 | 18 | c13   | 0.00203 | 

我无法为 col2 定义正确的架构以将其类型从 String 转换为 jsondict.然后,我希望能够将值分解为多列,如上所示.任何帮助将不胜感激.我使用的是 Spark 2.0 +.

I am having trouble defining the right schema for col2 to cast its type from String to json or dict. And then, I would like to be able to explode the values to multiple columns as shown above. Any help would be greatly appreciated. I am using Spark 2.0 + .

谢谢!

推荐答案

更新我的答案,我使用 udf 将密钥放入数组,然后爆炸以达到所需的输出

Updating my answer, I used udf to put the key into array, then explode to reach the desired output

请看下面的例子:

import json
import re

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType

df = spark.createDataFrame([
    ('test:1',
     '{"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}]},'
     '{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]}'),
    ('test:2', '{"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}')
], ['col1', 'col2'])

schema = ArrayType(
    StructType(
        [
            StructField("Col", StringType()),
            StructField("Id", StringType()),
            StructField("cName", StringType()),
            StructField("pScore", DoubleType())
        ]
    )
)


@f.udf(returnType=schema)
def parse_col(column):
    updated_values = []

    for it in re.finditer(r'{.*?}]}', column):
        parse = json.loads(it.group())
        for key, values in parse.items():
            for value in values:
                value['Col'] = key
                updated_values.append(value)

    return updated_values


df = df \
    .withColumn('tmp', parse_col(f.col('col2'))) \
    .withColumn('tmp', f.explode(f.col('tmp'))) \
    .select(f.col('col1'),
            f.col('tmp').Col.alias('col2'),
            f.col('tmp').Id.alias('Id'),
            f.col('tmp').cName.alias('cName'),
            f.col('tmp').pScore.alias('pScore'))

df.show()

输出:

+------+--------------+---+-----+--------+
|  col1|          col2| Id|cName|  pScore|
+------+--------------+---+-----+--------+
|test:1|         test1| 17|   c1|    null|
|test:1|         test1| 01|   c2|0.003609|
|test:1|         test8|  1|  c11|     0.0|
|test:1|         test8|012|   c2|0.003609|
|test:2|test1:subtest2| 18|  c13| 0.00203|
+------+--------------+---+-----+--------+

这篇关于将列转换为 JSON/dict 并在 pyspark 中的列中展平 JSON 值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆