如何在pyspark中按字母顺序对嵌套结构的列进行排序? [英] How to sort columns of nested structs alphabetically in pyspark?

查看:49
本文介绍了如何在pyspark中按字母顺序对嵌套结构的列进行排序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下架构的数据.我希望所有列都应按字母顺序排序.我想要它在 pyspark 数据框中.

root|-- _id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- 地址: struct (nullable = true)||-- 引脚:整数(可为空 = 真)||-- city: string (nullable = true)||-- 街道:字符串(可为空 = 真)

下面的代码只对外部列排序,对嵌套列不排序.

<预><代码>>>>cols = df.columns>>>df2=df[排序(列)]>>>df2.printSchema()

这段代码之后的架构看起来像

root|-- _id: string (nullable = true)|-- 地址: struct (nullable = true)||-- 引脚:整数(可为空 = 真)||-- city: string (nullable = true)||-- 街道:字符串(可为空 = 真)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)

(因为id有下划线,所以先出现)

我想要的架构如下.(连地址里面的列也要排序)

root|-- _id: string (nullable = true)|-- 地址: struct (nullable = true)||-- city: string (nullable = true)||-- 引脚:整数(可为空 = 真)||-- 街道:字符串(可为空 = 真)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)

提前致谢.

解决方案

这里有一个解决方案,适用于任意深度嵌套的 StructType,它不依赖于硬编码任何列名.

为了演示,我创建了以下稍微复杂的架构,其中 address 列中有第二级嵌套.假设您的 DataFrame schema 如下:

df.printSchema()#根# |-- _id: string (nullable = true)# |-- first_name: string (nullable = true)# |-- last_name: string (nullable = true)# |-- 地址: struct (nullable = true)# ||-- 引脚:整数(可为空 = 真)# ||-- city: string (nullable = true)# ||-- zip: struct (nullable = true)# |||-- last4:整数(可为空=真)# |||-- first5: 整数 (nullable = true)# ||-- 街道:字符串(可为空 = 真)

注意 address.zip 字段,其中包含 2 个乱序子字段.

您可以定义一个函数,该函数将递归地遍历您的schema 并对字段进行排序以构建一个 Spark-SQL 选择表达式:

from pyspark.sql.types import StructType, StructFielddef schemaToSelectExpr(schema, baseField=""):select_cols = []对于 sorted(schema, key=lambda x: x.name) 中的 structField:如果 structField.dataType.typeName() == 'struct':子字段 = []对于排序中的 fld(structField.jsonValue()['type']['fields'],key=lambda x: x['name']):newStruct = StructType([StructField.fromJson(fld)])newBaseField = structField.name如果基场:newBaseField = baseField + "."+ 新基场subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))select_cols.append("struct(" + ",".join(subFields) + ") AS {}".format(structField.name))别的:如果基场:select_cols.append(baseField + "." + structField.name)别的:select_cols.append(structField.name)返回 select_cols

在这个 DataFrame 的模式上运行这​​个结果(为了可读性,我将长的地址"字符串分成两行):

print(schemaToSelectExpr(df.schema))#['_ID',#'struct(address.city,address.pin,address.street,# struct(address.zip.first5,address.zip.last4) AS zip) AS address',# '名',# '姓']

现在使用 selectExpr 对列进行排序:

df = df.selectExpr(schemaToSelectExpr(df.schema))df.printSchema()#根# |-- _id: string (nullable = true)# |-- 地址: struct (nullable = false)# ||-- city: string (nullable = true)# ||-- 引脚:整数(可为空 = 真)# ||-- 街道:字符串(可为空 = 真)# ||-- zip: struct (nullable = false)# |||-- first5: 整数 (nullable = true)# |||-- last4:整数(可为空 = 真)# |-- first_name: string (nullable = true)# |-- last_name: string (nullable = true)

I have data with below schema. I want all the columns should be in sorted alphabetically. I want it in pyspark data frame.

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

The below code sorts only the outer columns but not the nested columns.

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

The schema after this code looks like

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(since there's underscore at id, it appears first)

The schema which I want is as below. (Even the columns inside the address should be sorted)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

Thanks in advance.

解决方案

Here is a solution that should work for arbitrarily deeply nested StructTypes which doesn't rely on hard coding any column names.

To demonstrate, I've created the following slightly more complex schema, where there is a second level of nesting within the address column. Let's suppose that your DataFrame schema were the following:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)

Notice the address.zip field which contains 2 out of order sub fields.

You can define a function that will recursively step through your schema and sort the fields to build a Spark-SQL select expression:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols

Running this on this DataFrame's schema yields (I've broken the long 'address' string into two lines for readability):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']

Now use selectExpr to sort the columns:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)

这篇关于如何在pyspark中按字母顺序对嵌套结构的列进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆