如何将多个 Pandas DF 转换为单个 Spark DF? [英] How to I convert multiple Pandas DFs into a single Spark DF?
问题描述
在将它们加载到 Spark DF 之前,我需要加载和预处理几个 Excel 文件.我有这些需要处理的文件的列表.我做这样的事情来阅读它们:
I have several Excel files that I need to load and pre-process before loading them into a Spark DF. I have a list of these files that need to be processed. I do something like this to read them in:
file_list_rdd = sc.emptyRDD()
for file_path in file_list:
current_file_rdd = sc.binaryFiles(file_path)
print(current_file_rdd.count())
file_list_rdd = file_list_rdd.union(current_file_rdd)
然后我有一些映射器函数将 file_list_rdd
从一组 (path, bytes) 元组转换为 (path, Pandas DataFrame) 元组.这允许我使用 Pandas 读取 Excel 文件并操作这些文件,以便在将它们制作成 Spark DataFrame 之前它们是统一的.
I then have some mapper function that turns file_list_rdd
from a set of (path, bytes) tuples to (path, Pandas DataFrame) tuples. This allows me to use Pandas to read the Excel file and to manipulate the files so that they're uniform before making them into a Spark DataFrame.
如何获取(文件路径,Pandas DF)元组的 RDD 并将其转换为单个 Spark DF?我知道可以执行单个转换的函数,但不知道可以执行多个转换的函数.
How do I take an RDD of (file path, Pandas DF) tuples and turn it into a single Spark DF? I'm aware of functions that can do a single transformation, but not one that can do several.
我的第一次尝试是这样的:
My first attempt was something like this:
sqlCtx = SQLContext(sc)
def convert_pd_df_to_spark_df(item):
return sqlCtx.createDataFrame(item[0][1])
processed_excel_rdd.map(convert_pd_df_to_spark_df)
我猜这不起作用,因为 sqlCtx
没有随计算一起分发(这是一个猜测,因为堆栈跟踪对我来说没有多大意义).
I'm guessing that didn't work because sqlCtx
isn't distributed with the computation (it's a guess because the stack trace doesn't make much sense to me).
提前感谢您抽出时间阅读:)
Thanks in advance for taking the time to read :).
推荐答案
我通过编写这样的函数解决了这个问题:
I solved this by writing a function like this:
def pd_df_to_row(rdd_row):
key = rdd_row[0]
pd_df = rdd_row[1]
rows = list()
for index, series in pd_df.iterrows():
# Takes a row of a df, exports it as a dict, and then passes an unpacked-dict into the Row constructor
row_dict = {str(k):v for k,v in series.to_dict().items()}
rows.append(Row(**row_dict))
return rows
您可以通过调用以下内容来调用它:
You can invoke it by calling something like:
processed_excel_rdd = processed_excel_rdd.flatMap(pd_df_to_row)
pd_df_to_row
现在有一组 Spark Row
对象.你现在可以说:
pd_df_to_row
now has a collection of Spark Row
objects. You can now say:
processed_excel_rdd.toDF()
可能有比 Series
-> dict
-> Row
操作更有效的方法,但这让我通过了.
There's probably something more efficient than the Series
-> dict
-> Row
operation, but this got me through.
这篇关于如何将多个 Pandas DF 转换为单个 Spark DF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!