如何在Spark SQL中压缩两个数组列 [英] How to zip two array columns in Spark SQL

查看:80
本文介绍了如何在Spark SQL中压缩两个数组列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Pandas数据框.我尝试先将包含字符串值的两列连接到一个列表中,然后使用zip,然后将列表中的每个元素都用'_'连接.我的数据集如下:

I have a Pandas dataframe. I have tried to join two columns containing string values into a list first and then using zip, I joined each element of the list with '_'. My data set is like below:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

我想将这两个列连接到第三列,如下所示,用于数据框的每一行.

I wanted to join these two columns in a third column like below for each row of my dataframe.

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

我已经使用下面的代码在python中成功完成了此操作,但是该数据框非常大,并且需要花费很长时间才能在整个数据框上运行它.我想在PySpark中做同样的事情以提高效率.我已经成功读取了spark数据框中的数据,但是我很难确定如何使用PySpark等效函数复制Pandas函数.如何在PySpark中获得想要的结果?

I have successfully done so in python using the code below but the dataframe is quite large and it takes a very long time to run it for the whole dataframe. I want to do the same thing in PySpark for efficiency. I have read the data in spark dataframe successfully but I'm having a hard time determining how to replicate Pandas functions with PySpark equivalent functions. How can I get my desired result in PySpark?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

我已使用以下代码将两列转换为PySpark中的数组

I have converted the two columns to arrays in PySpark by using the below code

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

现在我所需要的只是使用'_'压缩两列中数组的每个元素.如何与此一起使用zip?感谢您的帮助.

Now all I need is to zip each element of the arrays in the two columns using '_'. How can I use zip with this? Any help is appreciated.

推荐答案

与Python等效的Spark SQL为

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

集合函数:返回结构的合并数组,其中第N个结构包含输入数组的所有第N个值.

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

因此,如果您已经有两个数组:

So if you already have two arrays:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

您可以将其应用于结果

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)

+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

现在可以组合结果了transform(如何使用变换高阶函数? TypeError:列不可迭代-如何遍历ArrayType()?):

Now to combine the results you can transform (How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)

+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

注意:

高阶函数transformarrays_zip已在Apache Spark 2.4中引入.

Higher order functions transform and arrays_zip has been introduced in Apache Spark 2.4.

这篇关于如何在Spark SQL中压缩两个数组列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆