PySpark DataFrame在使用爆炸之前将字符串的列更改为数组 [英] PySpark DataFrame change column of string to array before using explode
问题描述
在我的spark DataFrame中有一个json
格式的名为 event_data 的列,使用from_json
读取后,我得到了以下架构:
I have a column called event_data in json
format in my spark DataFrame, after reading it using from_json
, I get this schema:
root
|-- user_id: string (nullable = true)
|-- event_data: struct (nullable = true)
| |-- af_content_id: string (nullable = true)
| |-- af_currency: string (nullable = true)
| |-- af_order_id: long (nullable = true)
我仅需要此列中的af_content_id
.此属性可以具有不同的格式:
['ghhjj23','123546',12356]
af_content_id
)
I only need af_content_id
from this column. This attribute can be of different formats:
['ghhjj23','123546',12356]
af_content_id
)
当格式为 List 时,我想使用explode
函数为af_content_id
中的每个元素返回新行.但是当我应用它时,我得到一个错误:
I want to use explode
function in order to return a new row for each element in af_content_id
when it is of format List. But as when I apply it, I get an error:
from pyspark.sql.functions import explode
def get_content_id(column):
return column.af_content_id
df_transf_1 = df_transf_1.withColumn(
"products_basket",
get_content_id(df_transf_1.event_data)
)
df_transf_1 = df_transf_1.withColumn(
"product_id",
explode(df_transf_1.products_basket)
)
由于数据类型不匹配而无法解析'explode(
products_basket
)':函数explode的输入应该是数组或映射类型,而不是StringType;
cannot resolve 'explode(
products_basket
)' due to data type mismatch: input to function explode should be array or map type, not StringType;
我知道原因,这是因为字段af_content_id
可能包含的不同类型,但是我不知道如何解决它.直接在列上使用pyspark.sql.functions.array()
不起作用,因为它会变成array数组并爆炸不会产生预期的结果.
I know the reason, it's because of the different types that the field af_content_id
may contain, but I don't know how to resolve it. Using pyspark.sql.functions.array()
directly on the column doesn't work because it become array of array and explode will not produce the expected result.
用于重现我坚持的步骤的示例代码:
A sample code to reproduce the step that I'm stuck on:
import pandas as pd
arr = [
['b5ad805c-f295-4852-82fc-961a88',12732936],
['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
['0E3D17EA-BEEF-4931-8104','12909841'],
['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]
df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)
我正在寻找一种将 products_basket 转换为唯一可能的格式的方法: Array ,以便当我应用explode
时,每个格式将包含一个id行.
I'm looking for a way to convert products_basket to one only possible format: an Array so that when I apply explode
, it will contain one id per row.
推荐答案
如果您从以下数据帧开始:
If you are starting with a DataFrame like:
df_transf_1.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |['Gklb38', '123655'] |
#|0E3D17EA-BEEF-4931-8104 |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
其中products_basket
列是StringType
:
df.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: string (nullable = true)
您不能在products_basket
上调用explode
,因为它不是数组或映射.
You can't call explode
on products_basket
because it's not an array or map.
一种解决方法是删除所有前导/后方方括号,然后在", "
上分割字符串(逗号后跟一个空格).这会将字符串转换为字符串数组.
One workaround is to remove any leading/trailing square brackets and then split the string on ", "
(comma followed by a space). This will convert the string into an array of strings.
from pyspark.sql.functions import col, regexp_replace, split
df_transf_new= df_transf_1.withColumn(
"products_basket",
split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
)
df_transf_new.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
正则表达式模式与以下任意一项匹配:
The regular expression pattern matches any of the following:
-
(^\[)
:字符串开头的方括号 -
(\]$)
:字符串末尾的右方括号 -
(')
:任何单引号(因为您的字符串都被引用了)
(^\[)
: An opening square bracket at the start of the string(\]$)
: A closing square bracket at the end of the string(')
: Any single quote (because your strings are quoted)
,并将它们替换为空字符串.
and replaces these with an empty string.
这假定您的数据在product_basket
中不包含任何需要的单引号或方括号.
This assumes that your data does not contain any needed single quotes or square brackets inside the product_basket
.
在split
之后,新DataFrame的架构为:
After the split
, the schema of the new DataFrame is:
df_transf_new.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: array (nullable = true)
# | |-- element: string (containsNull = true)
现在您可以拨打explode
:
from pyspark.sql.functions import explode
df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
#+--------------------------------+------------------------------+----------+
#|user_id |products_basket |product_id|
#+--------------------------------+------------------------------+----------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |Gklb38 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |123655 |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#+--------------------------------+------------------------------+----------+
这篇关于PySpark DataFrame在使用爆炸之前将字符串的列更改为数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!