PySpark DataFrame在使用爆炸之前将字符串的列更改为数组 [英] PySpark DataFrame change column of string to array before using explode

查看:138
本文介绍了PySpark DataFrame在使用爆炸之前将字符串的列更改为数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的spark DataFrame中有一个json格式的名为 event_data 的列,使用from_json读取后,我得到了以下架构:

I have a column called event_data in json format in my spark DataFrame, after reading it using from_json, I get this schema:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)

我仅需要此列中的af_content_id.此属性可以具有不同的格式:

  • 字符串
  • 整数
  • Int和Str的列表.例如['ghhjj23','123546',12356]
  • 无(有时event_data不包含af_content_id)

    I only need af_content_id from this column. This attribute can be of different formats:

  • a String
  • an Integer
  • a List of Int and Str. eg ['ghhjj23','123546',12356]
  • None (sometimes event_data doesn't contain af_content_id)

    当格​​式为 List 时,我想使用explode函数为af_content_id中的每个元素返回新行.但是当我应用它时,我得到一个错误:

    I want to use explode function in order to return a new row for each element in af_content_id when it is of format List. But as when I apply it, I get an error:

    from pyspark.sql.functions import explode
    
    def get_content_id(column):
        return column.af_content_id
    
    df_transf_1 = df_transf_1.withColumn(
        "products_basket", 
        get_content_id(df_transf_1.event_data)
    )
    
    df_transf_1 = df_transf_1.withColumn(
        "product_id",
        explode(df_transf_1.products_basket)
    )
    

    由于数据类型不匹配而无法解析'explode(products_basket)':函数explode的输入应该是数组或映射类型,而不是StringType;

    cannot resolve 'explode(products_basket)' due to data type mismatch: input to function explode should be array or map type, not StringType;

    我知道原因,这是因为字段af_content_id可能包含的不同类型,但是我不知道如何解决它.直接在列上使用pyspark.sql.functions.array()不起作用,因为它会变成array数组并爆炸不会产生预期的结果.

    I know the reason, it's because of the different types that the field af_content_id may contain, but I don't know how to resolve it. Using pyspark.sql.functions.array() directly on the column doesn't work because it become array of array and explode will not produce the expected result.

    用于重现我坚持的步骤的示例代码:

    A sample code to reproduce the step that I'm stuck on:

    import pandas as pd
    
    arr = [
        ['b5ad805c-f295-4852-82fc-961a88',12732936],
        ['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
        ['0E3D17EA-BEEF-4931-8104','12909841'],
        ['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
    ]
    
    df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
    
    df = df[['user_id','products_basket']].astype(str)
    df_transf_1 = spark.createDataFrame(df)
    

    我正在寻找一种将 products_basket 转换为唯一可能的格式的方法: Array ,以便当我应用explode时,每个格式将包含一个id行.

    I'm looking for a way to convert products_basket to one only possible format: an Array so that when I apply explode, it will contain one id per row.

    推荐答案

    如果您从以下数据帧开始:

    If you are starting with a DataFrame like:

    df_transf_1.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |12732936                      |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |['Gklb38', '123655']          |
    #|0E3D17EA-BEEF-4931-8104         |12909841                      |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    

    其中products_basket列是StringType:

    df.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: string (nullable = true)
    

    您不能在products_basket上调用explode,因为它不是数组或映射.

    You can't call explode on products_basket because it's not an array or map.

    一种解决方法是删除所有前导/后方方括号,然后在", "上分割字符串(逗号后跟一个空格).这会将字符串转换为字符串数组.

    One workaround is to remove any leading/trailing square brackets and then split the string on ", " (comma followed by a space). This will convert the string into an array of strings.

    from pyspark.sql.functions import col, regexp_replace, split
    df_transf_new= df_transf_1.withColumn(
        "products_basket",
        split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
    )
    
    df_transf_new.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    

    正则表达式模式与以下任意一项匹配:

    The regular expression pattern matches any of the following:

    • (^\[):字符串开头的方括号
    • (\]$):字符串末尾的右方括号
    • ('):任何单引号(因为您的字符串都被引用了)
    • (^\[): An opening square bracket at the start of the string
    • (\]$): A closing square bracket at the end of the string
    • ('): Any single quote (because your strings are quoted)

    ,并将它们替换为空字符串.

    and replaces these with an empty string.

    这假定您的数据在product_basket中不包含任何需要的单引号或方括号.

    This assumes that your data does not contain any needed single quotes or square brackets inside the product_basket.

    split之后,新DataFrame的架构为:

    After the split, the schema of the new DataFrame is:

    df_transf_new.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: array (nullable = true)
    # |    |-- element: string (containsNull = true)
    

    现在您可以拨打explode:

    from pyspark.sql.functions import explode
    df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
    #+--------------------------------+------------------------------+----------+
    #|user_id                         |products_basket               |product_id|
    #+--------------------------------+------------------------------+----------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |12732936  |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |Gklb38    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |123655    |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |12909841  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #+--------------------------------+------------------------------+----------+
    

    这篇关于PySpark DataFrame在使用爆炸之前将字符串的列更改为数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆