如何在PySpark中进行爆炸? [英] How to do opposite of explode in PySpark?

查看:107
本文介绍了如何在PySpark中进行爆炸?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个DataFrame,其中一列用于用户,另一列用于其所写的单词:

Let's say I have a DataFrame with a column for users and another column for words they've written:

Row(user='Bob', word='hello')
Row(user='Bob', word='world')
Row(user='Mary', word='Have')
Row(user='Mary', word='a')
Row(user='Mary', word='nice')
Row(user='Mary', word='day')

我想将word列聚合为一个向量:

I would like to aggregate the word column into a vector:

Row(user='Bob', words=['hello','world'])
Row(user='Mary', words=['Have','a','nice','day'])

似乎我无法使用任何Sparks分组功能,因为它们期望随后的聚合步骤.我的用例是我想将这些数据提供给Word2Vec而不使用其他Spark聚合.

It seems I can't use any of Sparks grouping functions because they expect a subsequent aggregation step. My use case is that I want to feed these data into Word2Vec not use other Spark aggregations.

推荐答案

从spark 2.3版本开始,我们现在有了Pandas UDF(又名Vectorized UDF).下面的函数将完成OP的任务...使用此函数的好处是可以确保保留顺序.在许多情况下,例如时间序列分析,顺序是必不可少的.

As of the spark 2.3 release we now have Pandas UDF(aka Vectorized UDF). The function below will accomplish the OP's task... A benefit of using this function is the order is guaranteed to be preserved. Order is essential in many cases such as time series analysis.

import pandas as pd
import findspark

findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType

spark = SparkSession.builder.appName('test_collect_array_grouped').getOrCreate()

def collect_array_grouped(df, groupbyCols, aggregateCol, outputCol):
    """
    Aggregate function: returns a new :class:`DataFrame` such that for a given column, aggregateCol,
    in a DataFrame, df, collect into an array the elements for each grouping defined by the groupbyCols list.
    The new DataFrame will have, for each row, the grouping columns and an array of the grouped
    values from aggregateCol in the outputCol.

    :param groupbyCols: list of columns to group by.
            Each element should be a column name (string) or an expression (:class:`Column`).
    :param aggregateCol: the column name of the column of values to aggregate into an array
            for each grouping.
    :param outputCol: the column name of the column to output the aggregeted array to.
    """
    groupbyCols = [] if groupbyCols is None else groupbyCols
    df = df.select(groupbyCols + [aggregateCol])
    schema = df.select(groupbyCols).schema
    aggSchema = df.select(aggregateCol).schema
    arrayField = StructField(name=outputCol, dataType=ArrayType(aggSchema[0].dataType, False))
    schema = schema.add(arrayField)
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def _get_array(pd_df):
        vals = pd_df[groupbyCols].iloc[0].tolist()
        vals.append(pd_df[aggregateCol].values)
        return pd.DataFrame([vals])
    return df.groupby(groupbyCols).apply(_get_array)

rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)

collect_array_grouped(df, ['user'], 'word', 'users_words').show()

+----+--------------------+
|user|         users_words|
+----+--------------------+
|Mary|[Have, a, nice, day]|
| Bob|      [hello, world]|
+----+--------------------+

这篇关于如何在PySpark中进行爆炸?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆