如何将数据帧的几列转换为元组 [英] How to transfrom a few columns of a dataframe to a tuple

查看:27
本文介绍了如何将数据帧的几列转换为元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个看起来像这样的数据框

+---+----+------+-------+------+|Id|formrid|值|occ|注释+---+----+------+-------+------+|1|x1 |22.0|1|文本1||1|x1 |测试|2|文本2 ||1|x1 |11|3|文本3 ||1|x2 |21 |0 |文本4 ||2|p1 |1 |1|文本5 |+---+----+------+-------+------+

转换后的数据框应该是这样的

+---+------+------+--------------+--------+------+|Id|fomrid|元组(值)|元组(occ)|注释+---+------+--------------+--------+------+|1|x1 |(22.0, 测试,11)|([1,2,3)|文本1||1|x2 |(21) |(0) |文本4 ||2|p1 |(1) |(1) |文本5 |+---+-----+---------------+-------+-------+

解决方案

在撰写本文时,应该注意的是,spark 没有元组的数据/列类型,最接近的表示是列表.

但是,如果您希望将数据表示为元组字符串,您可以使用 collect_list 将您的项目分组,然后将其转换为字符串并用弧形大括号替换方括号以实现此目的.>

使用 spark sql

创建一个临时视图并在您的 Spark 会话中运行它

input_df.createOrReplaceTempView(my_temp_table_or_view")output_df = sparkSession.sql(<在此处插入sql>")

SELECTID,浓烈的,CONCAT('(',REGEXP_REPLACE(CAST(collect_list(values) A​​S STRING),'[\\\[\\\]]',''),')') 作为 tuple_values,CONCAT('(',REGEXP_REPLACE(CAST(collect_list(occ) AS STRING),'[\\\[\\\]]',''),')') 作为 tuple_occ,MIN(comments) 作为评论从my_temp_table_or_view通过...分组身份证号

输出

+---+------+----------------+---------+--------+|编号|格式|tuple_values|tuple_occ|评论|+---+------+----------------+---------+--------+|1|x1|(22.0, 测试, 11)|(1, 2, 3)|文本1||1|x2|(21)|(0)|文本4||2|p1|(1)|(1)|文本5|+---+------+----------------+---------+--------+

使用pyspark api

output_df = (input_df.groupBy([Id",formrid"]).agg(F.concat(掠过('('),F.regexp_replace(F.collect_list("values").cast("STRING"),'[\\[\\]]',''),掠过(')')).alias(tuple_values"),F.concat(掠过('('),F.regexp_replace(F.collect_list("occ").cast("STRING"),'[\\[\\]]',''),掠过(')')).alias(tuple_occ"),F.min("comments").alias("comments")))

使用scala spark api

val output_df =input_df.groupBy("Id","formrid").agg(连接(点燃(("),regexp_replace(collect_list("values").cast("STRING"),[\\[\\]]","),点亮()")).alias(tuple_values"),连接(点亮(("),regexp_replace(collect_list("occ").cast("STRING"),[\\[\\]]","),点亮()")).alias(tuple_occ"),min("comments").alias("comments"))

编辑 1:

在撰写本文时,由于需要为猪脚本使用元组列类型,Spark 或 parquet 本身不支持元组类型.Pig 在使用这种类型时使用特殊的写入器/读取器来转换镶木地板.

尝试修改或使用中间猪脚本可能更理想

  1. 如本示例
  2. 所示,以所需类型读取、转换和存储它
  3. 将字符串转换为元组,如本示例 ?
  4. 您也可以尝试另存为结构类型,看看是否适合您.我已经包含了一个用 pyspark 编写的片段,它为这些字段 List_valuesList_occ 动态确定并创建结构类型.虽然我已经在我的 spark 设置上测试了这段代码,但我目前没有猪设置.请告诉我这些方法中的任何一种(包括上述链接)是否适合您.

from pyspark.sql import 函数为 F输出_df = (input_df.groupBy(Id",formrid").agg(F.collect_list("values").alias("List_values"),F.collect_list("occ").alias("List_occ"),F.min("comments").alias("comments")))output_df.show()

输出

+---+------+----------------+---------+--------+|编号|格式|列表值|List_occ|评论|+---+------+----------------+---------+--------+|1|x1|[22.0, 测试, 11]|[1, 2, 3]|文本1||1|x2|[21]|[0]|文本4||2|p1|[1]|[1]|文本5|+---+------+----------------+---------+--------+

from pyspark.sql import types as Tfrom pyspark.sql 导入函数为 F# 根据每个字段的最大大小确定要包含在结构中的最大字段数max_field_sizes = output_df.selectExpr(MAX(size(List_values)) 作为 max_list_values",MAX(size(List_occ)) as max_list_occ";).first().asDict()# 动态创建具有可为空字段的模式以支持这些字段list_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_values'])])occ_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_occ'])])# 使用udf完成从列表到自定义schema的转换@F.udf(returnType=list_value_schema)def udf_transform_list_values_to_struct(list_values,max_field_size):返回值 = {}对于枚举(list_values)中的 idx,val:return_value[str(idx)]=val返回返回值@F.udf(returnType=occ_value_schema)def udf_transform_list_occ_to_struct(list_values,max_field_size):返回值 = {}对于枚举(list_values)中的 idx,val:return_value[str(idx)]=val返回返回值# 使用 udfs 执行转换updated_output_df = output_df.withColumn('List_values',udf_transform_list_values_to_struct(F.col('List_values'),F.lit(max_field_sizes['max_list_values'])))updated_output_df = updated_output_df.withColumn('List_occ',udf_transform_list_occ_to_struct(F.col('List_occ'),F.lit(max_field_sizes['max_list_occ'])))# 仅验证.打印模式并显示结果updated_output_df.printSchema()updated_output_df.show()

输出

root|-- Id: string (nullable = true)|-- formrid: 字符串 (nullable = true)|-- List_values: struct (nullable = true)||-- 0:字符串(可为空 = 真)||-- 1:字符串(可为空 = 真)||-- 2: 字符串(可为空 = 真)|-- List_occ: struct (nullable = true)||-- 0:字符串(可为空 = 真)||-- 1:字符串(可为空 = 真)||-- 2: 字符串(可为空 = 真)|-- 注释:字符串(可为空 = 真)+---+------+----------------+--------------+--------+|编号|格式|列表值|List_occ|评论|+---+------+----------------+--------------+--------+|1|x1|{22.0, 测试, 11}|{1, 2, 3}|文本1||1|x2|{21, null, null}|{0, null, null}|文本4||2|p1|{1,空,空}|{1,空,空}|文本5|+---+------+----------------+--------------+--------+

I have a dataframe that looks like this

+---+----+------+-------+------+
| Id|fomrid|values|occ| comments
+---+----+------+-------+------+
|  1| x1  |   22.0|   1|  text1|
|  1| x1  |   test|   2| text2 |
|  1| x1  |     11|   3| text3 |
|  1| x2  |    21 |  0 | text4 |
|  2| p1  |     1 |   1| text5 |
+---+----+------+-------+------+

The transformed dataframe should look this

+---+------+--------------+--------+------+
| Id|fomrid|tuple(values) |tuple(occ)| comments
+---+------+--------------+--------+------+
|  1| x1   |(22.0, test,11)| ([1,2,3)|  text1|
|  1| x2   | (21)          | (0)     | text4 |
|  2| p1   | (1)           | (1)     | text5 |
+---+-----+---------------+-------+-------+

解决方案

At the time of writing, it should be noted that spark does not have a data/column type for tuple with the closest representation being that of a list.

However, if you would like your data represented as a tuple string you may may use collect_list to group your items before casting it to a string and replacing the square brackets with curved braces to achieve this.

Using spark sql

Creating a temporary view and running this on your spark session

input_df.createOrReplaceTempView("my_temp_table_or_view")
output_df = sparkSession.sql("<insert sql below here>")

SELECT
    Id,
    fomrid,
    CONCAT('(',REGEXP_REPLACE(CAST(collect_list(values) AS STRING),'[\\\[\\\]]',''),')') as tuple_values,
    CONCAT('(',REGEXP_REPLACE(CAST(collect_list(occ) AS STRING),'[\\\[\\\]]',''),')') as tuple_occ,
    MIN(comments) as comments
FROM
   my_temp_table_or_view
GROUP BY
   Id, fomrid

Outputs

+---+------+----------------+---------+--------+
| Id|fomrid|    tuple_values|tuple_occ|comments|
+---+------+----------------+---------+--------+
|  1|    x1|(22.0, test, 11)|(1, 2, 3)|   text1|
|  1|    x2|            (21)|      (0)|   text4|
|  2|    p1|             (1)|      (1)|   text5|
+---+------+----------------+---------+--------+

Using pyspark api

output_df = (
    input_df.groupBy(["Id","fomrid"])
            .agg(
                F.concat(
                    F.lit('('),
                    F.regexp_replace(
                        F.collect_list("values").cast("STRING"),
                        '[\\[\\]]',
                        ''
                    ),
                    F.lit(')')
                ).alias("tuple_values"),
                F.concat(
                    F.lit('('),
                    F.regexp_replace(
                        F.collect_list("occ").cast("STRING"),
                        '[\\[\\]]',
                        ''
                    ),
                    F.lit(')')
                ).alias("tuple_occ"),
                F.min("comments").alias("comments")               
            )
)

Using scala spark api

val output_df = 
    input_df.groupBy("Id","fomrid")
            .agg(
                concat(
                    lit("("),
                    regexp_replace(
                        collect_list("values").cast("STRING"),
                        "[\\[\\]]",
                        ""
                    ),
                    lit(")")
                ).alias("tuple_values"),
                concat(
                    lit("("),
                    regexp_replace(
                        collect_list("occ").cast("STRING"),
                        "[\\[\\]]",
                        ""
                    ),
                    lit(")")
                ).alias("tuple_occ"),
                min("comments").alias("comments")               
            )

Edit 1:

At the time of writing, as it pertains to desiring a tuple column type for your pig script, Spark nor parquet, does not support a tuple type natively. Pig uses special writers/readers to convert parquet fields when using this type.

It may be more ideal to try modifying or using an intermediary pig script to

  1. read, transform and store this in the desired type as in this example or
  2. converting a string to a tuple as in this example ?
  3. You could also try saving as a struct type and seeing if that works for you. I've included a snippet, written in pyspark that dynamically determines and creates a struct type for these fields List_values and List_occ. Although I have tested this code on my spark setup, I do not have a pig setup at this time. Please update me whether either of these approaches including the links above work for you.

from pyspark.sql import functions as F

output_df = (
    input_df.groupBy("Id","fomrid")
            .agg(
                F.collect_list("values").alias("List_values"),
                F.collect_list("occ").alias("List_occ"),
                F.min("comments").alias("comments")               
            )
)
output_df.show()

Output

+---+------+----------------+---------+--------+
| Id|fomrid|     List_values| List_occ|comments|
+---+------+----------------+---------+--------+
|  1|    x1|[22.0, test, 11]|[1, 2, 3]|   text1|
|  1|    x2|            [21]|      [0]|   text4|
|  2|    p1|             [1]|      [1]|   text5|
+---+------+----------------+---------+--------+

from pyspark.sql import types as T
from pyspark.sql import functions as F

# determine max number of fields to include in struct based on max size of each field
max_field_sizes = output_df.selectExpr(
    "MAX(size(List_values)) as max_list_values",
    "MAX(size(List_occ)) as max_list_occ"
).first().asDict()

# dynamically create schemas with nullable fields to support these fields
list_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_values'])])
occ_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_occ'])])

# use udf to complete transformation from list to custom schema
@F.udf(returnType=list_value_schema)
def udf_transform_list_values_to_struct(list_values,max_field_size):
    return_value = {}
    for idx,val in enumerate(list_values):
        return_value[str(idx)]=val
    return return_value

@F.udf(returnType=occ_value_schema)
def udf_transform_list_occ_to_struct(list_values,max_field_size):
    return_value = {}
    for idx,val in enumerate(list_values):
        return_value[str(idx)]=val
    return return_value

# perform transformations using udfs
updated_output_df = output_df.withColumn('List_values',
                                 udf_transform_list_values_to_struct(
                                     F.col('List_values'),
                                     F.lit(max_field_sizes['max_list_values'])
                                 )
                                )

updated_output_df = updated_output_df.withColumn('List_occ',
                                 udf_transform_list_occ_to_struct(
                                     F.col('List_occ'),
                                     F.lit(max_field_sizes['max_list_occ'])
                                 )
                                )
# verification only. Printing schema and showing results
updated_output_df.printSchema()
updated_output_df.show()

Output

root
 |-- Id: string (nullable = true)
 |-- fomrid: string (nullable = true)
 |-- List_values: struct (nullable = true)
 |    |-- 0: string (nullable = true)
 |    |-- 1: string (nullable = true)
 |    |-- 2: string (nullable = true)
 |-- List_occ: struct (nullable = true)
 |    |-- 0: string (nullable = true)
 |    |-- 1: string (nullable = true)
 |    |-- 2: string (nullable = true)
 |-- comments: string (nullable = true)

+---+------+----------------+---------------+--------+
| Id|fomrid|     List_values|       List_occ|comments|
+---+------+----------------+---------------+--------+
|  1|    x1|{22.0, test, 11}|      {1, 2, 3}|   text1|
|  1|    x2|{21, null, null}|{0, null, null}|   text4|
|  2|    p1| {1, null, null}|{1, null, null}|   text5|
+---+------+----------------+---------------+--------+

这篇关于如何将数据帧的几列转换为元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆