Pyspark:如何通过合并 spark 中的值来展平嵌套数组 [英] Pyspark: How to flatten nested arrays by merging values in spark

查看:23
本文介绍了Pyspark:如何通过合并 spark 中的值来展平嵌套数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 10000 个具有不同 id 的 json,每个都有 10000 个名称.如何通过在 pyspark 中通过 int 或 str 合并值来展平嵌套数组?

I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?

我添加了列 name_10000_xvz 来解释更好的数据结构.我也更新了注释、输入 df、所需的输出 df 和输入的 json 文件.

I have added column name_10000_xvz to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.

注意事项:

  • 输入数据帧有超过 10000 列 name_1_a、name_1000_xx,因此列(数组)名称不能硬编码,因为它需要写入 10000 个名称
  • iddateval 在所有列和所有 json 中始终具有相同的命名约定
  • 数组大小可能会有所不同,但dateval 始终存在,因此可以对其进行硬编码
  • date 在每个数组中可以不同,例如 name_1_a 从 2001 开始,但是 name_10000_xvz for id == 1 从 2000 开始,finnish 从 2004 开始,但是对于 id == 2 开始于 1990 和以 2004 年结束
  • Input dataframe has more than 10000 columns name_1_a, name_1000_xx so column(array) names can not be hardcoded as it will requires to write 10000 names
  • id, date, val has always the same naming convention across all columns and all jsons
  • array size can vary but date, val are always there so they can be hardcoded
  • date can be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004

输入df:

root
 |-- id: long (nullable = true)
 |-- name_10000_xvz: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_2_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)

+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz                                                          |name_1_a                         |name_1_b                         |name_2_a                            |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2  |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1  |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}]                        |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+

所需的输出 df:

+---+---------+----------+-----------+---------+----------------+
|id |   date  | name_1_a | name_1_b  |name_2_a | name_10000_xvz |
+---+---------+----------+-----------+---------+----------------+
|1  |   2000  |     0    |    0      |   0     |        30      |
|1  |   2001  |     1    |    4      |   21    |        31      |
|1  |   2002  |     2    |    5      |   22    |        32      |
|1  |   2003  |     3    |    6      |   23    |        33      |
|2  |   1990  |     0    |    0      |   0     |        39      |
|2  |   2000  |     0    |    0      |   0     |        30      |
|2  |   2001  |     1    |    4      |   21    |        31      |
|2  |   2002  |     2    |    5      |   22    |        32      |
|2  |   2003  |     3    |    6      |   23    |        33      |
|2  |   2004  |     0    |    0      |   0     |        34      |
+---+---------+----------+-----------+---------+----------------+

要重现输入 df:

df = spark.read.json(sc.parallelize([
  """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
  """{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))

有用的链接:

推荐答案

UPDATE

正如 @werner 所提到的,有必要转换所有结构以将列名附加到其中.

UPDATE

As @werner has mentioned, it's necessary to transform all structs to append the column name into it.

import pyspark.sql.functions as f

names = [column for column in df.columns if column.startswith('name_')]

expressions = []
for name in names:
  expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))

flatten_df = (df
              .withColumn('flatten', f.flatten(f.array(*expressions)))
              .selectExpr('id', 'inline(flatten)'))

output_df = (flatten_df
             .groupBy('id', 'date')
             .pivot('name', names)
             .agg(f.first('val')))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1  |2000|30            |null    |null    |null    |
|1  |2001|31            |1       |4       |21      |
|1  |2002|32            |2       |5       |22      |
|1  |2003|33            |3       |6       |23      |
|2  |1990|39            |null    |null    |null    |
|2  |2000|30            |null    |null    |null    |
|2  |2001|31            |1       |4       |21      |
|2  |2002|32            |2       |5       |22      |
|2  |2003|33            |3       |6       |23      |
|2  |2004|34            |null    |null    |null    |
+---+----+--------------+--------+--------+--------+

假设:

  • date 值始终是所有列的相同值
  • name_1_a, name_1_b, name_2_a 它们的大小相等
  • date value is always the same value all columns
  • name_1_a, name_1_b, name_2_a their sizes are equals
import pyspark.sql.functions as f

output_df = (df
             .withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
                                           'STRUCT(name_1_a[i].date AS date, ' \
                                           '       name_1_a[i].val AS name_1_a, ' \
                                           '       name_1_b[i].val AS name_1_b, ' \
                                           '       name_2_a[i].val AS name_2_a))'))
             .selectExpr('id', 'inline(flatten)'))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|1  |2002|2       |5       |22      |
|1  |2003|3       |6       |23      |
|2  |2001|1       |4       |21      |
|2  |2002|2       |5       |22      |
|2  |2003|3       |6       |23      |
+---+----+--------+--------+--------+

这篇关于Pyspark:如何通过合并 spark 中的值来展平嵌套数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆