Pyspark - 循环遍历 structType 和 ArrayType 以在 structfield 中进行类型转换 [英] Pyspark - Looping through structType and ArrayType to do typecasting in the structfield

查看:249
本文介绍了Pyspark - 循环遍历 structType 和 ArrayType 以在 structfield 中进行类型转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 pyspark 很陌生,这个问题让我很困惑.基本上,我正在寻找一种可扩展方式来通过 structType 或 ArrayType 循环类型转换.

I am quite new to pyspark and this problem is boggling me. Basically I am looking for a scalable way to loop typecasting through a structType or ArrayType.

我的数据架构示例:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: integer (nullable = true)
 |    |-- rate_2: integer (nullable = true)
 |    |-- rate_3: integer (nullable = true)
 |    |-- card_fee: integer (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: integer (nullable = true)
 |    |    |-- rate_2: integer (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

正如你在这里看到的,card_rates 是结构体,online_rates 是结构体数组.我正在寻找遍历上述所有字段并有条件地对它们进行类型转换的方法.理想情况下,如果它应该是数字,则应该转换为double,如果它应该是字符串,则应该转换为字符串.我需要循环,因为那些 rate_* 字段可能会随着时间增长.

As you can see here, card_rates is struct and online_rates is an array of struct. I am looking ways to loop through all the fields above and conditionally typecast them. Ideally if it is supposed to be numeric, it should be converted to double, if it is supposed to be string, It should be converted to string. I need to loop because those rate_* fields may grow with time.

但就目前而言,我对能够循环它们并将它们全部转换为字符串感到满意,因为我对 pyspark 非常陌生,并且仍在尝试了解它.

But for now, I am content with being able to loop them and typecast all of them to string since I am very new with pyspark and still trying to get a feel of it.

我想要的输出架构:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: double (nullable = true)
 |    |-- rate_2: double (nullable = true)
 |    |-- rate_3: double (nullable = true)
 |    |-- card_fee: double (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: double (nullable = true)
 |    |    |-- rate_2: double (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

我的想法已经不多了.

我从这里得到了参考:PySpark 将数组内的结构字段转换为字符串

但是这个解决方案对字段进行了硬编码,并没有真正遍历字段.

but this solution hardcodes the field and does not really loop over the fields.

请帮忙.

推荐答案

这里是一种借助 StructType.simpleString_parse_datatype_string 内置函数的解决方案:

Here is one solution with the help of StructType.simpleString and the _parse_datatype_string build-in function:

from pyspark.sql.types import *

df_schema = StructType([
  StructField("_id", StringType(), True),
  StructField("created", TimestampType(), True),
  StructField("card_rates", StructType([
                  StructField("rate_1", IntegerType(), True),
                  StructField("rate_2", IntegerType(), True),
                  StructField("rate_3", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True)])),
  StructField("online_rates", ArrayType(
                  StructType(
                    [
                      StructField("rate_1", IntegerType(),True),
                      StructField("rate_2", IntegerType(),True),
                      StructField("online_fee", DoubleType(),True)
                    ]),True),True),
  StructField("updated", TimestampType(), True)])

schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int, card_fee:int>,online_rates:array<struct<rate_1:int,rate_2:int,online_fee:double>>,updated:timestamp>

double_schema = schema_str.replace(':int', ':double')

# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema

  1. 首先使用 schema.simpleString
  2. 将您的架构转换为一个简单的字符串
  3. 然后将所有 :int 替换为 :double
  4. 最后使用_parse_datatype_string
  5. 将修改后的字符串模式转换为StructType
  1. First convert your schema into a simple string with schema.simpleString
  2. Then replace all :int with :double
  3. Finally convert the modified string schema into StructType with _parse_datatype_string

更新:

为了避免@jxc 指出的反引号问题,更好的解决方案是递归扫描元素,如下所示:

In order to avoid the issue with the backticks that @jxc pointed out a better solution would be a recursive scan through the elements as shown next:

def transform_schema(schema):

  if schema == None:
    return StructType()

  updated = []
  for f in schema.fields:
    if isinstance(f.dataType, IntegerType):
      # if IntegerType convert to DoubleType
      updated.append(StructField(f.name, DoubleType(), f.nullable))
    elif isinstance(f.dataType, ArrayType):
      # if ArrayType unpack the array type(elementType), do recursion then wrap results with ArrayType 
      updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType))))
    elif isinstance(f.dataType, StructType):
      # if StructType do recursion
      updated.append(StructField(f.name, transform_schema(f.dataType)))
    else:
      # else handle all the other cases i.e TimestampType, StringType etc
      updated.append(StructField(f.name, f.dataType, f.nullable))   

  return StructType(updated)

# call the function with your schema
transform_schema(df_schema)

说明: 该函数遍历架构(StructType)上的每一项,并尝试将 int 字段(StructField)转换为 double.最后将转换后的schema(StructType)传递给上层(父StructType).

Explanation: the function goes through each item on the schema (StructType) and tries to convert the int fields (StructField) into double. Finally delivers the converted schema (StructType) to the above layer (parent StructType).

输出:

StructType(List(
  StructField(_id,StringType,true),
  StructField(created,TimestampType,true),
  StructField(card_rates,
              StructType(List(StructField(rate_1,DoubleType,true),
                              StructField(rate_2,DoubleType,true),
                              StructField(rate_3,DoubleType,true),
                              StructField(card_fee,DoubleType,true),
                              StructField(card_fee,DoubleType,true))),true),
  StructField(online_rates,ArrayType(
    StructType(List(
      StructField(rate_1,DoubleType,true),
      StructField(rate_2,DoubleType,true),
      StructField(online_fee,DoubleType,true))),true),true),
  StructField(updated,TimestampType,true)))

更新:(2020-02-02)

这是一个关于如何将新架构与现有数据框一起使用的示例:

And here is one example on how to use the new schema together with the existing dataframe:

updated_schema = transform_schema(df.schema)

# cast each column to the new type
select_expr = [df[f.name].cast(f.dataType) for f in updated_schema.fields]

df.select(*select_expr).show()

这篇关于Pyspark - 循环遍历 structType 和 ArrayType 以在 structfield 中进行类型转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆