PySpark Dataframe将列融为行 [英] PySpark Dataframe melt columns into rows

查看:81
本文介绍了PySpark Dataframe将列融为行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如主题所描述的,我有一个PySpark数据框,我需要将三列融合为行。每列实质上代表一个类别中的单个事实。最终目的是将每个类别的数据汇总到一个单一的总数中。

As the subject describes, I have a PySpark Dataframe that I need to melt three columns into rows. Each column essentially represents a single fact in a category. The ultimate goal is to aggregate the data into a single total per category.

此数据框中有几千万行,所以我需要一种方法来进行转换

There are tens of millions of rows in this dataframe, so I need a way to do the transformation on the spark cluster without bringing back any data to the driver (Jupyter in this case).

这里是我的数据框摘录,仅用于少数商店:

+ ----------- + ---------------- + --------- -------- + ---------------- +
| store_id | qty_on_hand_milk | qty_on_hand_bread | qty_on_hand_eggs |
+ ----------- + ---------------- + ---------------- -+ ---------------- +
| 100 | 30 | 105 | 35 |
| 200 | 55 | 85 | 65 |
| 300 | 20 | 125 | 90 |
+ ----------- + ---------------- + ---------------- -+ ---------------- +

Here is an extract of my dataframe for just a few stores: +-----------+----------------+-----------------+----------------+ | store_id |qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs| +-----------+----------------+-----------------+----------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +-----------+----------------+-----------------+----------------+

这是所需的结果数据框,每个商店多行,其中原始数据框的各列已融化为新数据框的各行,新类别列中每个原始列有一行:

+ ----------- + -------- + ----------- +
| product_id | CATEGORY | qty_on_hand |
+ ----------- + -------- + ----------- +
| 100 |牛奶| 30 |
| 100 |面包| 105 |
| 100 |鸡蛋| 35 |
| 200 |牛奶| 55 |
| 200 |面包| 85 |
| 200 |鸡蛋| 65 |
| 300 |牛奶| 20 |
| 300 |面包| 125 |
| 300 |鸡蛋| 90 |
+ ----------- + -------- + ----------- +

Here is the desired resulting dataframe, multiple rows per store, where the columns of the original dataframe have been melted into rows of the new dataframe, with one row per original column in a new category column: +-----------+--------+-----------+ | product_id|CATEGORY|qty_on_hand| +-----------+--------+-----------+ | 100| milk| 30| | 100| bread| 105| | 100| eggs| 35| | 200| milk| 55| | 200| bread| 85| | 200| eggs| 65| | 300| milk| 20| | 300| bread| 125| | 300| eggs| 90| +-----------+--------+-----------+

最终,我想汇总结果数据框以获取每个类别的总计:

+ ----- --- + ----------------- +
| CATEGORY | total_qty_on_hand |
+ -------- + ----------------- +
|牛奶| 105 |
|面包| 315 |
|鸡蛋| 190 |
+ -------- + ----------------- +

Ultimately, I want to aggregate the resulting dataframe to get the totals per category: +--------+-----------------+ |CATEGORY|total_qty_on_hand| +--------+-----------------+ | milk| 105| | bread| 315| | eggs| 190| +--------+-----------------+

更新:
有一个建议,这个问题是重复的,可以回答此处。情况并非如此,因为该解决方案将行转换为列,而我需要做相反的操作,将列融为行。

UPDATE: There is a suggestion that this question is a duplicate and can be answered here. This is not the case, as the solution casts rows to columns and I need to do the reverse, melt columns into rows.

推荐答案

我们可以使用 explode() 函数来解决此问题。在Python中,可以使用 melt 完成相同的操作。

We can use explode() function to solve this issue. In Python, the same thing can be done with melt.

# Loading the requisite packages
from pyspark.sql.functions import col, explode, array, struct, expr, sum
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

编写以下函数,哪个将爆炸此DataFrame-

Writing the function below, which shall explode this DataFrame -

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])

将此DataFrame上的函数应用于爆炸 it-

Applying the function on this DataFrame to explode it-

df = to_explode(df, ['store_id'])\
     .drop('store_id')
df.show()
+-----------------+-----------+
|         CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk|         30|
|qty_on_hand_bread|        105|
| qty_on_hand_eggs|         35|
| qty_on_hand_milk|         55|
|qty_on_hand_bread|         85|
| qty_on_hand_eggs|         65|
| qty_on_hand_milk|         20|
|qty_on_hand_bread|        125|
| qty_on_hand_eggs|         90|
+-----------------+-----------+

现在,我们需要从 CATEGORY qty_on_hand _ >列。可以使用 expr( )功能。注意 expr 遵循基于1的子字符串索引,而不是0-

Now, we need to remove the string qty_on_hand_ from CATEGORY column. It can be done using expr() function. Note expr follows 1 based indexing for the substring, as opposed to 0 -

df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
|    milk|         30|
|   bread|        105|
|    eggs|         35|
|    milk|         55|
|   bread|         85|
|    eggs|         65|
|    milk|         20|
|   bread|        125|
|    eggs|         90|
+--------+-----------+

最后,使用CATEGORY 分组的 qty_on_hand 列spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.agg rel = noreferrer> agg()函数-

Finally, aggregating the column qty_on_hand grouped by CATEGORY using agg() function -

df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+

这篇关于PySpark Dataframe将列融为行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆