pySpark将mapPartitions的结果转换为spark DataFrame [英] pySpark convert result of mapPartitions to spark DataFrame

查看:295
本文介绍了pySpark将mapPartitions的结果转换为spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一项工作需要在分区的spark数据帧上运行,该过程如下:

I have a job requires to run on a partitioned spark dataframe, and the process looks like:

rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))

结果是 pandas.dataframe

type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame

rdd.glom().collect()返回的结果如下:

[[df1], [df2], ...]

现在我希望将结果转换为spark数据帧,我的方式是:

Now I hope to convert the result to a spark dataframe, the way I did is:

sp = None
for i, partition in enumerate(rdd.collect()):
    if i == 0:
        sp = spark.createDataFrame(partition)
    else:
        sp = sp.union(spark.createDataFrame(partition))

return sp

但是,结果可能非常庞大,并且 rdd.collect()可能超出了驱动程序的内存,因此我需要避免 collect()操作.有办法解决这个问题吗?

However, the result could be huge and rdd.collect() may exceed driver's memory, so I need to avoid collect() operation. Is there a way to address the problem?

提前谢谢!

推荐答案

如果您想使用rdd api. mapPartitions 接受一种类型的迭代器,并期望其他类型的迭代器作为结果.pandas_df不是 mapPartitions 可以直接处理的迭代器类型.如果必须使用pandas api,则可以从 pandas.iterrows

If you want to stay with rdd api. mapPartitions accepts an iterator of a type and expects an iterator of another type as result. A pandas_df is not an iterator type mapPartitions can deal with directly. If you must work with pandas api, you can just create a proper generator from pandas.iterrows

这样,您的整体 mapPartitions 结果将是您行类型的唯一rdd,而不是熊猫数据框的rdd.这样的rdd可以通过即时模式发现无缝地转换回数据帧

This way your overall mapPartitions result will be a single rdd of your row type instead of an rdd of pandas dataframes. such rdd can be seamlessly converted into a dataframe back with on-the-fly schema discovery

from pyspark.sql import Row

def some_fuction(iter):
  pandas_df = some_pandas_result(iter)
  for index, row in pandas_df.iterrows():
     yield Row(id=index, foo=row['foo'], bar=row['bar'])


rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)

这篇关于pySpark将mapPartitions的结果转换为spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆