如何将数据帧的几列转换为元组 [英] How to transfrom a few columns of a dataframe to a tuple
问题描述
我有一个看起来像这样的数据框
+---+----+------+-------+------+|Id|formrid|值|occ|注释+---+----+------+-------+------+|1|x1 |22.0|1|文本1||1|x1 |测试|2|文本2 ||1|x1 |11|3|文本3 ||1|x2 |21 |0 |文本4 ||2|p1 |1 |1|文本5 |+---+----+------+-------+------+
转换后的数据框应该是这样的
+---+------+------+--------------+--------+------+|Id|fomrid|元组(值)|元组(occ)|注释+---+------+--------------+--------+------+|1|x1 |(22.0, 测试,11)|([1,2,3)|文本1||1|x2 |(21) |(0) |文本4 ||2|p1 |(1) |(1) |文本5 |+---+-----+---------------+-------+-------+
在撰写本文时,应该注意的是,spark 没有元组的数据/列类型,最接近的表示是列表.
但是,如果您希望将数据表示为元组字符串,您可以使用 collect_list
将您的项目分组,然后将其转换为字符串并用弧形大括号替换方括号以实现此目的.>
使用 spark sql
创建一个临时视图并在您的 Spark 会话中运行它
input_df.createOrReplaceTempView(my_temp_table_or_view")output_df = sparkSession.sql(<在此处插入sql>")
SELECTID,浓烈的,CONCAT('(',REGEXP_REPLACE(CAST(collect_list(values) AS STRING),'[\\\[\\\]]',''),')') 作为 tuple_values,CONCAT('(',REGEXP_REPLACE(CAST(collect_list(occ) AS STRING),'[\\\[\\\]]',''),')') 作为 tuple_occ,MIN(comments) 作为评论从my_temp_table_or_view通过...分组身份证号
输出
+---+------+----------------+---------+--------+|编号|格式|tuple_values|tuple_occ|评论|+---+------+----------------+---------+--------+|1|x1|(22.0, 测试, 11)|(1, 2, 3)|文本1||1|x2|(21)|(0)|文本4||2|p1|(1)|(1)|文本5|+---+------+----------------+---------+--------+
使用pyspark api
output_df = (input_df.groupBy([Id",formrid"]).agg(F.concat(掠过('('),F.regexp_replace(F.collect_list("values").cast("STRING"),'[\\[\\]]',''),掠过(')')).alias(tuple_values"),F.concat(掠过('('),F.regexp_replace(F.collect_list("occ").cast("STRING"),'[\\[\\]]',''),掠过(')')).alias(tuple_occ"),F.min("comments").alias("comments")))
使用scala spark api
val output_df =input_df.groupBy("Id","formrid").agg(连接(点燃(("),regexp_replace(collect_list("values").cast("STRING"),[\\[\\]]","),点亮()")).alias(tuple_values"),连接(点亮(("),regexp_replace(collect_list("occ").cast("STRING"),[\\[\\]]","),点亮()")).alias(tuple_occ"),min("comments").alias("comments"))
编辑 1:
在撰写本文时,由于需要为猪脚本使用元组列类型,Spark 或 parquet 本身不支持元组类型.Pig 在使用这种类型时使用特殊的写入器/读取器来转换镶木地板.
尝试修改或使用中间猪脚本可能更理想
- 如本示例或 所示,以所需类型读取、转换和存储它
- 将字符串转换为元组,如本示例 ?
- 您也可以尝试另存为结构类型,看看是否适合您.我已经包含了一个用 pyspark 编写的片段,它为这些字段
List_values
和List_occ
动态确定并创建结构类型.虽然我已经在我的 spark 设置上测试了这段代码,但我目前没有猪设置.请告诉我这些方法中的任何一种(包括上述链接)是否适合您.
from pyspark.sql import 函数为 F输出_df = (input_df.groupBy(Id",formrid").agg(F.collect_list("values").alias("List_values"),F.collect_list("occ").alias("List_occ"),F.min("comments").alias("comments")))output_df.show()
输出
+---+------+----------------+---------+--------+|编号|格式|列表值|List_occ|评论|+---+------+----------------+---------+--------+|1|x1|[22.0, 测试, 11]|[1, 2, 3]|文本1||1|x2|[21]|[0]|文本4||2|p1|[1]|[1]|文本5|+---+------+----------------+---------+--------+
from pyspark.sql import types as Tfrom pyspark.sql 导入函数为 F# 根据每个字段的最大大小确定要包含在结构中的最大字段数max_field_sizes = output_df.selectExpr(MAX(size(List_values)) 作为 max_list_values",MAX(size(List_occ)) as max_list_occ";).first().asDict()# 动态创建具有可为空字段的模式以支持这些字段list_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_values'])])occ_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_occ'])])# 使用udf完成从列表到自定义schema的转换@F.udf(returnType=list_value_schema)def udf_transform_list_values_to_struct(list_values,max_field_size):返回值 = {}对于枚举(list_values)中的 idx,val:return_value[str(idx)]=val返回返回值@F.udf(returnType=occ_value_schema)def udf_transform_list_occ_to_struct(list_values,max_field_size):返回值 = {}对于枚举(list_values)中的 idx,val:return_value[str(idx)]=val返回返回值# 使用 udfs 执行转换updated_output_df = output_df.withColumn('List_values',udf_transform_list_values_to_struct(F.col('List_values'),F.lit(max_field_sizes['max_list_values'])))updated_output_df = updated_output_df.withColumn('List_occ',udf_transform_list_occ_to_struct(F.col('List_occ'),F.lit(max_field_sizes['max_list_occ'])))# 仅验证.打印模式并显示结果updated_output_df.printSchema()updated_output_df.show()
输出
root|-- Id: string (nullable = true)|-- formrid: 字符串 (nullable = true)|-- List_values: struct (nullable = true)||-- 0:字符串(可为空 = 真)||-- 1:字符串(可为空 = 真)||-- 2: 字符串(可为空 = 真)|-- List_occ: struct (nullable = true)||-- 0:字符串(可为空 = 真)||-- 1:字符串(可为空 = 真)||-- 2: 字符串(可为空 = 真)|-- 注释:字符串(可为空 = 真)+---+------+----------------+--------------+--------+|编号|格式|列表值|List_occ|评论|+---+------+----------------+--------------+--------+|1|x1|{22.0, 测试, 11}|{1, 2, 3}|文本1||1|x2|{21, null, null}|{0, null, null}|文本4||2|p1|{1,空,空}|{1,空,空}|文本5|+---+------+----------------+--------------+--------+
I have a dataframe that looks like this
+---+----+------+-------+------+
| Id|fomrid|values|occ| comments
+---+----+------+-------+------+
| 1| x1 | 22.0| 1| text1|
| 1| x1 | test| 2| text2 |
| 1| x1 | 11| 3| text3 |
| 1| x2 | 21 | 0 | text4 |
| 2| p1 | 1 | 1| text5 |
+---+----+------+-------+------+
The transformed dataframe should look this
+---+------+--------------+--------+------+
| Id|fomrid|tuple(values) |tuple(occ)| comments
+---+------+--------------+--------+------+
| 1| x1 |(22.0, test,11)| ([1,2,3)| text1|
| 1| x2 | (21) | (0) | text4 |
| 2| p1 | (1) | (1) | text5 |
+---+-----+---------------+-------+-------+
At the time of writing, it should be noted that spark does not have a data/column type for tuple with the closest representation being that of a list.
However, if you would like your data represented as a tuple string you may may use collect_list
to group your items before casting it to a string and replacing the square brackets with curved braces to achieve this.
Using spark sql
Creating a temporary view and running this on your spark session
input_df.createOrReplaceTempView("my_temp_table_or_view")
output_df = sparkSession.sql("<insert sql below here>")
SELECT
Id,
fomrid,
CONCAT('(',REGEXP_REPLACE(CAST(collect_list(values) AS STRING),'[\\\[\\\]]',''),')') as tuple_values,
CONCAT('(',REGEXP_REPLACE(CAST(collect_list(occ) AS STRING),'[\\\[\\\]]',''),')') as tuple_occ,
MIN(comments) as comments
FROM
my_temp_table_or_view
GROUP BY
Id, fomrid
Outputs
+---+------+----------------+---------+--------+
| Id|fomrid| tuple_values|tuple_occ|comments|
+---+------+----------------+---------+--------+
| 1| x1|(22.0, test, 11)|(1, 2, 3)| text1|
| 1| x2| (21)| (0)| text4|
| 2| p1| (1)| (1)| text5|
+---+------+----------------+---------+--------+
Using pyspark api
output_df = (
input_df.groupBy(["Id","fomrid"])
.agg(
F.concat(
F.lit('('),
F.regexp_replace(
F.collect_list("values").cast("STRING"),
'[\\[\\]]',
''
),
F.lit(')')
).alias("tuple_values"),
F.concat(
F.lit('('),
F.regexp_replace(
F.collect_list("occ").cast("STRING"),
'[\\[\\]]',
''
),
F.lit(')')
).alias("tuple_occ"),
F.min("comments").alias("comments")
)
)
Using scala spark api
val output_df =
input_df.groupBy("Id","fomrid")
.agg(
concat(
lit("("),
regexp_replace(
collect_list("values").cast("STRING"),
"[\\[\\]]",
""
),
lit(")")
).alias("tuple_values"),
concat(
lit("("),
regexp_replace(
collect_list("occ").cast("STRING"),
"[\\[\\]]",
""
),
lit(")")
).alias("tuple_occ"),
min("comments").alias("comments")
)
Edit 1:
At the time of writing, as it pertains to desiring a tuple column type for your pig script, Spark nor parquet, does not support a tuple type natively. Pig uses special writers/readers to convert parquet fields when using this type.
It may be more ideal to try modifying or using an intermediary pig script to
- read, transform and store this in the desired type as in this example or
- converting a string to a tuple as in this example ?
- You could also try saving as a struct type and seeing if that works for you. I've included a snippet, written in pyspark that dynamically determines and creates a struct type for these fields
List_values
andList_occ
. Although I have tested this code on my spark setup, I do not have a pig setup at this time. Please update me whether either of these approaches including the links above work for you.
from pyspark.sql import functions as F
output_df = (
input_df.groupBy("Id","fomrid")
.agg(
F.collect_list("values").alias("List_values"),
F.collect_list("occ").alias("List_occ"),
F.min("comments").alias("comments")
)
)
output_df.show()
Output
+---+------+----------------+---------+--------+
| Id|fomrid| List_values| List_occ|comments|
+---+------+----------------+---------+--------+
| 1| x1|[22.0, test, 11]|[1, 2, 3]| text1|
| 1| x2| [21]| [0]| text4|
| 2| p1| [1]| [1]| text5|
+---+------+----------------+---------+--------+
from pyspark.sql import types as T
from pyspark.sql import functions as F
# determine max number of fields to include in struct based on max size of each field
max_field_sizes = output_df.selectExpr(
"MAX(size(List_values)) as max_list_values",
"MAX(size(List_occ)) as max_list_occ"
).first().asDict()
# dynamically create schemas with nullable fields to support these fields
list_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_values'])])
occ_value_schema = T.StructType([ T.StructField(str(i),T.StringType(),True) for i in range(0,max_field_sizes['max_list_occ'])])
# use udf to complete transformation from list to custom schema
@F.udf(returnType=list_value_schema)
def udf_transform_list_values_to_struct(list_values,max_field_size):
return_value = {}
for idx,val in enumerate(list_values):
return_value[str(idx)]=val
return return_value
@F.udf(returnType=occ_value_schema)
def udf_transform_list_occ_to_struct(list_values,max_field_size):
return_value = {}
for idx,val in enumerate(list_values):
return_value[str(idx)]=val
return return_value
# perform transformations using udfs
updated_output_df = output_df.withColumn('List_values',
udf_transform_list_values_to_struct(
F.col('List_values'),
F.lit(max_field_sizes['max_list_values'])
)
)
updated_output_df = updated_output_df.withColumn('List_occ',
udf_transform_list_occ_to_struct(
F.col('List_occ'),
F.lit(max_field_sizes['max_list_occ'])
)
)
# verification only. Printing schema and showing results
updated_output_df.printSchema()
updated_output_df.show()
Output
root
|-- Id: string (nullable = true)
|-- fomrid: string (nullable = true)
|-- List_values: struct (nullable = true)
| |-- 0: string (nullable = true)
| |-- 1: string (nullable = true)
| |-- 2: string (nullable = true)
|-- List_occ: struct (nullable = true)
| |-- 0: string (nullable = true)
| |-- 1: string (nullable = true)
| |-- 2: string (nullable = true)
|-- comments: string (nullable = true)
+---+------+----------------+---------------+--------+
| Id|fomrid| List_values| List_occ|comments|
+---+------+----------------+---------------+--------+
| 1| x1|{22.0, test, 11}| {1, 2, 3}| text1|
| 1| x2|{21, null, null}|{0, null, null}| text4|
| 2| p1| {1, null, null}|{1, null, null}| text5|
+---+------+----------------+---------------+--------+
这篇关于如何将数据帧的几列转换为元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!