重命名Spark数据框中的嵌套字段 [英] Rename nested field in spark dataframe

查看:124
本文介绍了重命名Spark数据框中的嵌套字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Spark中具有数据框df:

Having a dataframe df in Spark:

 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

如何将字段array_field.a重命名为array_field.a_renamed?

[更新]:

.withColumnRenamed()不适用于嵌套字段,因此我尝试了这种hacky和不安全的方法:

.withColumnRenamed() does not work with nested fields so I tried this hacky and unsafe method:

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

我知道设置私有属性不是一个好习惯,但我不知道其他为df设置架构的方法

I know that setting a private attribute is not a good practice but I don't know other way to set the schema for df

我认为我在正确的轨道上,但是df.printSchema()仍然显示array_field.a的旧名称,尽管df.schema == schemaTrue

I think I am on a right track but df.printSchema() still shows the old name for array_field.a, though df.schema == schema is True

推荐答案

Python

不可能修改单个嵌套字段.您必须重新创建一个整体结构.在这种情况下,最简单的解决方案是使用cast.

It is not possible to modify a single nested field. You have to recreate a whole structure. In this particular case the simplest solution is to use cast.

首先进口一堆:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
    ArrayType, LongType, StringType, StructField, StructType)

和示例数据:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

让我们确认架构与您的案例相同:

Let's confirm that the schema is the same as in your case:

df.printSchema()

root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

您可以将新模式定义为例如字符串:

You can define a new schema for example as a string:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()

root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

DataType:

struct_schema = ArrayType(StructType([
    StructField("a_renamed", StringType()),
    StructField("b", LongType()),
    StructField("c", LongType())
]))

 df.select(col("array_field").cast(struct_schema)).printSchema()

root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

scala

Scala中可以使用相同的技术:

The same techniques can be used in Scala:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
    StructField("a_renamed", StringType),
    StructField("b", LongType),
    StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

可能的改进:

如果您使用表达性数据操作或JSON处理库,则将数据类型转储到dict或JSON字符串并从那里获取它可能会更容易,例如(Python/

If you use an expressive data manipulation or JSON processing library it could be easier to dump data types to dict or JSON string and take it from there for example (Python / toolz):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
    keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
   #  Get schema of the field as a dict
   df.schema["array_field"].jsonValue(),
   # Update fields with rename
   update_in(
       keys=["type", "elementType", "fields"],
       func=lambda x: pipe(x, map(rename_field), list)),
   # Load schema from dict
   StructField.fromJson,
   # Get data type
   attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()

这篇关于重命名Spark数据框中的嵌套字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆