Spark:如何使用嵌套数组转置和分解列 [英] Spark: How to transpose and explode columns with nested arrays
问题描述
我应用了以下问题中的算法(在 NOTE 中)来转置和分解嵌套的 spark 数据框.
I applied an algorithm from the question below(in NOTE) to transpose and explode nested spark dataframe.
当我定义 cols = ['a', 'b']
时,我得到空的数据框,但是当我定义 cols = ['a']
时,我得到了转换a
列的数据框.有关更多详细信息,请参阅下面的当前代码部分.任何帮助将不胜感激.
When I define cols = ['a', 'b']
I get empty dataframe, but when I define cols = ['a']
I get transformed dataframe for the a
column. See section Current code below for more details. Any help would be appreciated.
我正在寻找所需的输出 2(转置和爆炸),但即使是所需输出 1(转置)的示例也会非常有用.
I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.
注意:这是突出问题的最小示例,实际上数据帧架构和数组长度与示例中的一样Pyspark:如何通过在 spark 中合并值来展平嵌套数组
NOTE: This is minimum example to highlight the problem, in reality dataframe schema and arrays length vary as in the example Pyspark: How to flatten nested arrays by merging values in spark
输入df:
+---+------------------+--------+
| id| a| b|
+---+------------------+--------+
| 1|[{1, 1}, {11, 11}]| null|
| 2| null|[{2, 2}]|
+---+------------------+--------+
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true
所需的输出 1 (transpose_df):
Required output 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
+---+------+-------------------+
所需的输出 2 (explode_df):
Required output 2 (explode_df):
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
| 1| a| 1| 1|
| 1| a| 11| 11|
| 2| b| 2| 2|
+---+----+----+---+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}"""]))
cols = ['a', 'b']
expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]
transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')
explode_df.show()
目前的成果
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
推荐答案
stack 可能比 transpose
更好.
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
#expr = stack(2,'a',a,'b',b)
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
这篇关于Spark:如何使用嵌套数组转置和分解列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!