动态定义结构的列值 [英] Column values to dynamically define struct
问题描述
我有两个嵌套数组,一个是字符串,另一个是浮点数.我想基本上把它压缩起来,每行有一个 (value, var) 组合.我试图只用一个数据框来做到这一点,而不必求助于 rdds 或 udfs,认为这会更干净、更快.
I have two nested arrays one is strings the other are floats. I would like to essentially zip this up and have one (value, var) combo per row. I was trying to do it with just a dataframe and not have to resort to rdds or udfs thinking that this would be cleaner and faster.
我可以将值数组、每行变量转换为一个值、变量、每行 1 个的结构,但是由于我的数组大小不同,我必须在不同的范围内运行我的数组理解.所以我想我可以在列中指定长度并使用它.但是因为我将使用一列,所以这是一个语法错误.关于如何使用列动态构建这样的结构的任何建议(如果可能,没有 rdd/udf)?:
I can turn array of values, variables per row into a struct of a value, variable, 1-per-row, but because my array sizes vary I have to run my array comprehension over different ranges. So I thought I could just specify the length in a column and use that. But because I would be using a column it's a syntax error. Any suggestions on how to use a column to dynamically build a struct like this (without rdd/udf if possible)?:
from pyspark.sql.functions import col, array, struct, explode
DF1 = spark.createDataFrame([(["a", "b", "c", "d", "e", "f"], [1,2,3,4,5,6], 6),
(["g"], [7], 1),
(["a", "b", "g", "c"], [4,5,3,6], 4),
(["c", "d"], [2,3], 2),
(["a", "b", "c"], [5,7,2], 3)],
["vars", "vals", "num_elements"])
DF1.show()
arrayofstructs = array(*[struct(
DF1.vars[c].alias("variables"),
DF1.vals[c].alias("values")
#) for c in DF1.num_elements]) # <- DOES NOT WORK
) for c in range(10)]) # <- FIXED SIZE DOES WORK
DF2 = DF1.withColumn("new", explode(arrayofstructs))
DF2.show()
DF3 = DF2.filter(DF2.new.variables.isNotNull())
DF3.show()
+------------------+------------------+------------+
| vars| vals|num_elements|
+------------------+------------------+------------+
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|
| [g]| [7]| 1|
| [a, b, g, c]| [4, 5, 3, 6]| 4|
| [c, d]| [2, 3]| 2|
| [a, b, c]| [5, 7, 2]| 3|
+------------------+------------------+------------+
+------------------+------------------+------------+------+
| vars| vals|num_elements| new|
+------------------+------------------+------------+------+
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[a, 1]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[b, 2]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[c, 3]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[d, 4]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[e, 5]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[f, 6]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6| [,]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6| [,]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6| [,]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6| [,]|
| [g]| [7]| 1|[g, 7]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
| [g]| [7]| 1| [,]|
+------------------+------------------+------------+------+
only showing top 20 rows
+------------------+------------------+------------+------+
| vars| vals|num_elements| new|
+------------------+------------------+------------+------+
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[a, 1]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[b, 2]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[c, 3]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[d, 4]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[e, 5]|
|[a, b, c, d, e, f]|[1, 2, 3, 4, 5, 6]| 6|[f, 6]|
| [g]| [7]| 1|[g, 7]|
| [a, b, g, c]| [4, 5, 3, 6]| 4|[a, 4]|
| [a, b, g, c]| [4, 5, 3, 6]| 4|[b, 5]|
| [a, b, g, c]| [4, 5, 3, 6]| 4|[g, 3]|
| [a, b, g, c]| [4, 5, 3, 6]| 4|[c, 6]|
| [c, d]| [2, 3]| 2|[c, 2]|
| [c, d]| [2, 3]| 2|[d, 3]|
| [a, b, c]| [5, 7, 2]| 3|[a, 5]|
| [a, b, c]| [5, 7, 2]| 3|[b, 7]|
| [a, b, c]| [5, 7, 2]| 3|[c, 2]|
+------------------+------------------+------------+------+
推荐答案
你可以尝试这样破解:
from pyspark.sql.functions import col, lit, posexplode, expr, split
(DF1
.select("*", posexplode(split(expr("repeat('_', num_elements - 1)"), '_')))
.select(col("vars").getItem(col("pos")),col("vals").getItem(col("pos")))
.show())
# +---------+---------+
# |vars[pos]|vals[pos]|
# +---------+---------+
# | a| 1|
# | b| 2|
# | c| 3|
# | d| 4|
# | e| 5|
# | f| 6|
# | g| 7|
# | a| 4|
# | b| 5|
# | g| 3|
# | c| 6|
# | c| 2|
# | d| 3|
# | a| 5|
# | b| 7|
# | c| 2|
# +---------+---------+
但它绝不是更干净、更快".我个人会使用 RDD
:
but it is anything but "cleaner and faster". Personally I would use RDD
:
(DF1.rdd
.flatMap(lambda row: ((val, var) for val, var in zip(row.vals, row.vars)))
.toDF(["val", "var"])
.show())
# +---+---+
# |val|var|
# +---+---+
# | 1| a|
# | 2| b|
# | 3| c|
# | 4| d|
# | 5| e|
# | 6| f|
# | 7| g|
# | 4| a|
# | 5| b|
# | 3| g|
# | 6| c|
# | 2| c|
# | 3| d|
# | 5| a|
# | 7| b|
# | 2| c|
# +---+---+
但是 udf
也能工作.
这篇关于动态定义结构的列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!