pyspark:通过 ArrayType 列过滤和提取结构 [英] pyspark: filtering and extract struct through ArrayType column

查看:148
本文介绍了pyspark:通过 ArrayType 列过滤和提取结构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark 2.2 并具有以下架构

I'm using pyspark 2.2 and has the following schema

root
 |-- col1: string (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

和数据

+----+----------------------------------------------+
|col1|col2                                          |
+----+----------------------------------------------+
|A   |[[id1, [k -> v1]], [id2, [k2 -> v5, k -> v2]]]|
|B   |[[id3, [k -> v3]], [id4, [k3 -> v6, k -> v4]]]|
+----+----------------------------------------------+

col2 是一个复杂的结构.它是一个结构数组,每个结构都有两个元素,一个 id 字符串和一个 metadata 映射.(这是一个简化的数据集,真实的数据集在结构体中有 10 多个元素,在 metadata 字段中有 10 多个键值对).

col2 is a complex structure. It's an array of struct and every struct has two elements, an id string and a metadata map. (that's a simplified dataset, the real dataset has 10+ elements within struct and 10+ key-value pairs in the metadata field).

我想形成一个查询,该查询返回与我的过滤逻辑匹配的数据帧(比如 col1 == 'A'col2.id == 'id2' 和 <代码>col2.metadata.k == 'v2').

I want to form a query that returns a dataframe matching my filtering logic (say col1 == 'A' and col2.id == 'id2' and col2.metadata.k == 'v2').

结果看起来像这样,过滤逻辑可以匹配数组中最多一个结构,因此在第二列中它只是一个结构而不是一个结构的数组

The result would look like this, the filtering logic can match at most one struct within the array so in the second column it's just one struct instead of an array of one struct

+----+--------------------------+
|col1|col2_filtered             |
+----+--------------------------+
|A   |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+

我知道如何通过 explode 实现这一点,但问题是 col2 通常有 100 多个结构体,并且最多只有一个匹配我的过滤逻辑,所以我不要认为 explode 是一个可扩展的解决方案.

I know how to achieve this through explode, but the issue is col2 normally has over 100+ structs and there will be at most one matching my filtering logic so I don't think explode is a scalable solution.

谁能告诉我怎么做,提前致谢!

Can someone tells me how to do that, thanks in advance!

以下是用于设置的代码块.

Below is the code block for setting things up.

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', ArrayType(
        StructType([
            StructField('id', StringType(), True),
            StructField('metadata', MapType(StringType(), StringType()), True)
        ])
    ))
])

data = [
    ('A', [('id1', {'k': 'v1'}), ('id2', {'k': 'v2', 'k2': 'v5'})]),
    ('B', [('id3', {'k': 'v3'}), ('id4', {'k': 'v4', 'k3': 'v6'})])
]

df = spark.createDataFrame(data=data, schema=schema)

推荐答案

您可以尝试使用 UDF:

you can try a UDF:

import pyspark.sql.functions as F

df2 = df.filter(
    F.udf(lambda x: any([y.id == 'id2' and 'k' in y.metadata.keys() for y in x]), 'boolean')('col2')
).withColumn(
    'col2',
    F.udf(lambda x: [y for y in x if y.id == 'id2' and 'k' in y.metadata.keys()][0], 'struct<id:string,metadata:map<string,string>>')('col2')
)

df2.show(truncate=False)
+----+--------------------------+
|col1|col2                      |
+----+--------------------------+
|A   |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+


您可以将列转换为 JSON 并检查 col2 是否包含所需的 JSON:


You can cast the columns to JSON and check if col2 contains the desired JSON:

import pyspark.sql.functions as F

df2 = df.filter(
    (F.col('col1') == 'A') &
    F.to_json('col2').contains(
        F.to_json(
            F.struct(
                F.lit('id2').alias('id'),
                F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
            )
        )
    )
)

df2.show(truncate=False)
+----+------------------------------------+
|col1|col2                                |
+----+------------------------------------+
|A   |[[id1, [k -> v1]], [id2, [k -> v2]]]|
+----+------------------------------------+

如果你只是想在 col2 中保留匹配的结构体,你可以使用 withColumn 替换它:

If you just want to keep the matching struct in col2, you can replace it using withColumn:

df3 = df2.withColumn(
    'col2', 
    F.struct(
        F.lit('id2').alias('id'),
        F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
    )
)

df3.show()
+----+----------------+
|col1|            col2|
+----+----------------+
|   A|[id2, [k -> v2]]|
+----+----------------+

这篇关于pyspark:通过 ArrayType 列过滤和提取结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆