Spark:如何使用动态嵌套数组转置和分解列 [英] Spark: How to transpose and explode columns with dynamic nested arrays

查看:27
本文介绍了Spark:如何使用动态嵌套数组转置和分解列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我应用了问题 Spark 中的算法:如何使用嵌套数组转置和分解列,以使用动态数组转置和分解嵌套的 spark 数据框.

I applied an algorithm from the question Spark: How to transpose and explode columns with nested arrays to transpose and explode nested spark dataframe with dynamic arrays.

我已经添加到数据帧 """{"id":3,"c":[{"date":3,"val":3,"val_dynamic";:3}]}}"""" ,带有新列 c ,其中数组具有新的 val_dynamic 字段,该字段可以随机出现.

I have added to the dataframe """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}""" , with new column c, where array has new val_dynamic field which can appear on random basis.

我正在寻找所需的输出 2(转置和爆炸),但即使是所需输出 1(转置)的示例也会非常有用.

I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.

输入df:

+------------------+--------+-----------+---+
|                 a|       b|          c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]|    null|       null|  1|
|              null|[{2, 2}]|       null|  2|
|              null|    null|[{3, 3, 3}]|  3|   !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+


root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)  !!! NOTE: Added `val_dynamic`
 |-- id: long (nullable = true)

所需的输出 1 (transpose_df):

Required output 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
|  3|  c   | [{3, 3, 3}]       | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+

所需的输出 2 (explode_df):

Required output 2 (explode_df):

+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
|  1|   a|   1|  1|   null    |
|  1|   a|  11| 11|   null    |
|  2|   b|   2|  2|   null    |
|  3|   c|   3|  3|      3    |  !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+

当前代码:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}""",
  """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
    ]))

df.show()

cols = [ 'a', 'b', 'c']

#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"


transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

transpose_df.show()

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()

目前的结果

AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda$2580/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false

ref : 使用 Spark 将列转置为行

推荐答案

stack 要求所有堆叠的列具有相同的类型.这里的问题是数组内部的结构具有不同的成员.一种方法是将缺少的成员添加到所有结构中,以便我的之前的答案 的方法再次起作用.

stack requires that all stacked columns have the same type. The problem here is that the structs inside of the arrays have different members. One approach would be to add the missing members to all structs so that the approach of my previous answer works again.

cols = ['a', 'b', 'c']

#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields)) 
      for i,c in enumerate(df.columns) if c in cols}

#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))

#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" + 
    ",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields]) 
    + f")) as {c}" for c in cols]

#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)

full_struct_df 现在有架构

root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)

从这里开始,逻辑像以前一样工作:

From here the logic works as before:

stack_expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"

transpose_df = full_struct_df.selectExpr("id", stack_expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

这个答案的第一部分要求

The first part of this answer requires that

  • cols 中提到的每一列都是一个结构体数组
  • 所有结构体的所有成员都是long.此限制的原因是创建转换表达式时的 cast(null as long).
  • each column mentioned in cols is an array of structs
  • all members of all structs are longs. The reason for this restriction is the cast(null as long) when creating the transform expression.

这篇关于Spark:如何使用动态嵌套数组转置和分解列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆