如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来 [英] How can I concatenate the rows in a pyspark dataframe with multiple columns using groupby and aggregate

查看:59
本文介绍了如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含多列的 pyspark 数据框.例如下面的那个.

I have a pyspark dataframe with multiple columns. For example the one below.

from pyspark.sql import Row
l = [('Jack',"a","p"),('Jack',"b","q"),('Bell',"c","r"),('Bell',"d","s")]
rdd = sc.parallelize(l)
score_rdd = rdd.map(lambda x: Row(name=x[0], letters1=x[1], letters2=x[2]))
score_card = sqlContext.createDataFrame(score_rdd)

+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack|       a|       p|
|Jack|       b|       q|
|Bell|       c|       r|
|Bell|       d|       s|
+----+--------+--------+

现在我想按名称"分组并将两列的每一行中的值连接起来.我知道怎么做,但假设有数千行,那么我的代码变得非常难看.这是我的解决方案.

Now I want to group by "name" and concatenate the values in every row for both columns. I know how to do it but let's say there are thousands of rows then my code becomes very ugly. Here is my solution.

import pyspark.sql.functions as f
t = score_card.groupby("name").agg(
    f.concat_ws("",collect_list("letters1").alias("letters1")),
    f.concat_ws("",collect_list("letters2").alias("letters2"))
)

这是我将其保存在 CSV 文件中时得到的输出.

Here is the output I get when I save it in a CSV file.

+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack|      ab|      pq|
|Bell|      cd|      rs|
+----+--------+--------+

但我主要关心的是这两行代码

But my main concern is about these two lines of code

f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))

如果有数千列,那么我将不得不重复上面的代码数千次.是否有更简单的解决方案,以便我不必为每一列重复 f.concat_ws()?

If there are thousands of columns then I will have to repeat the above code thousands of times. Is there a simpler solution for this so that I don't have to repeat f.concat_ws() for every column?

我找遍了所有地方都没有找到解决方案.

I have searched everywhere and haven't been able to find a solution.

推荐答案

是的,您可以在 agg 函数中使用 for 循环并遍历 df.columns.如果有帮助,请告诉我.

yes, you can use for loop inside agg function and iterate through df.columns. Let me know if it helps.

    from pyspark.sql import functions as F
    df.show()

    # +--------+--------+----+
    # |letters1|letters2|name|
    # +--------+--------+----+
    # |       a|       p|Jack|
    # |       b|       q|Jack|
    # |       c|       r|Bell|
    # |       d|       s|Bell|
    # +--------+--------+----+

    df.groupBy("name").agg( *[F.array_join(F.collect_list(column), "").alias(column) for column in df.columns if column !='name' ]).show()

    # +----+--------+--------+
    # |name|letters1|letters2|
    # +----+--------+--------+
    # |Bell|      cd|      rs|
    # |Jack|      ab|      pq|
    # +----+--------+--------+

这篇关于如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆