如何将多个 Pandas DF 转换为单个 Spark DF? [英] How to I convert multiple Pandas DFs into a single Spark DF?

查看:68
本文介绍了如何将多个 Pandas DF 转换为单个 Spark DF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在将它们加载到 Spark DF 之前,我需要加载和预处理几个 Excel 文件.我有这些需要处理的文件的列表.我做这样的事情来阅读它们:

I have several Excel files that I need to load and pre-process before loading them into a Spark DF. I have a list of these files that need to be processed. I do something like this to read them in:

file_list_rdd = sc.emptyRDD()

for file_path in file_list:
    current_file_rdd = sc.binaryFiles(file_path)
    print(current_file_rdd.count())
    file_list_rdd = file_list_rdd.union(current_file_rdd)

然后我有一些映射器函数将 file_list_rdd 从一组 (path, bytes) 元组转换为 (path, Pandas DataFrame) 元组.这允许我使用 Pandas 读取 Excel 文件并操作这些文件,以便在将它们制作成 Spark DataFrame 之前它们是统一的.

I then have some mapper function that turns file_list_rdd from a set of (path, bytes) tuples to (path, Pandas DataFrame) tuples. This allows me to use Pandas to read the Excel file and to manipulate the files so that they're uniform before making them into a Spark DataFrame.

如何获取(文件路径,Pandas DF)元组的 RDD 并将其转换为单个 Spark DF?我知道可以执行单个转换的函数,但不知道可以执行多个转换的函数.

How do I take an RDD of (file path, Pandas DF) tuples and turn it into a single Spark DF? I'm aware of functions that can do a single transformation, but not one that can do several.

我的第一次尝试是这样的:

My first attempt was something like this:

sqlCtx = SQLContext(sc)

def convert_pd_df_to_spark_df(item):
    return sqlCtx.createDataFrame(item[0][1])

processed_excel_rdd.map(convert_pd_df_to_spark_df)

我猜这不起作用,因为 sqlCtx 没有随计算一起分发(这是一个猜测,因为堆栈跟踪对我来说没有多大意义).

I'm guessing that didn't work because sqlCtx isn't distributed with the computation (it's a guess because the stack trace doesn't make much sense to me).

提前感谢您抽出时间阅读:)

Thanks in advance for taking the time to read :).

推荐答案

我通过编写这样的函数解决了这个问题:

I solved this by writing a function like this:

def pd_df_to_row(rdd_row):
    key = rdd_row[0]
    pd_df = rdd_row[1]        

    rows = list()
    for index, series in pd_df.iterrows():
        # Takes a row of a df, exports it as a dict, and then passes an unpacked-dict into the Row constructor

        row_dict = {str(k):v for k,v in series.to_dict().items()}
        rows.append(Row(**row_dict))

    return rows

您可以通过调用以下内容来调用它:

You can invoke it by calling something like:

processed_excel_rdd = processed_excel_rdd.flatMap(pd_df_to_row)

pd_df_to_row 现在有一组 Spark Row 对象.你现在可以说:

pd_df_to_row now has a collection of Spark Row objects. You can now say:

processed_excel_rdd.toDF()

可能有比 Series-> dict-> Row 操作更有效的方法,但这让我通过了.

There's probably something more efficient than the Series-> dict-> Row operation, but this got me through.

这篇关于如何将多个 Pandas DF 转换为单个 Spark DF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆