如何在保留现有架构的情况下按行创建DataFrame? [英] How to create a DataFrame out of rows while retaining existing schema?

查看:121
本文介绍了如何在保留现有架构的情况下按行创建DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我调用map或mapPartition并且我的函数从PySpark接收行,那么创建本地PySpark或Pandas DataFrame的自然方法是什么?结合行并保留架构的东西吗?

If I call map or mapPartition and my function receives rows from PySpark what is the natural way to create either a local PySpark or Pandas DataFrame? Something that combines the rows and retains the schema?

目前,我正在执行以下操作:

Currently I do something like:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)

推荐答案

火花> = 2.3.0

自Spark 2.3.0起,可以按分区或组使用熊猫SeriesDataFrame.参见例如:

Since Spark 2.3.0 it is possible to use Pandas Series or DataFrame by partition or group. See for example:

  • Applying UDFs on GroupedData in PySpark (with functioning python example)
  • Efficient string suffix detection

火花< 2.3.0

创建本地PySpark的自然方法是什么

what is the natural way to create either a local PySpark

没有这样的事情. Spark分布式数据结构无法嵌套,或者您希望从另一个角度看,您不能嵌套操作或转换.

There is no such thing. Spark distributed data structures cannot be nested or you prefer another perspective you cannot nest actions or transformations.

或Pandas DataFrame

or Pandas DataFrame

这相对容易,但是您至少要记住几件事:

It is relatively easy but you have to remember at least few things:

  • 熊猫和Spark数据帧甚至在远程上都不等效.这些是不同的结构,具有不同的属性,通常您不能用另一个替换它.
  • 分区可以为空.
  • 您似乎正在传递字典.请记住,基本的Python字典是无序的(例如,不同于collections.OrderedDict).因此,传递列可能无法按预期方式工作.
  • Pandas and Spark DataFrames are not even remotely equivalent. These are different structures, with different properties and in general you cannot replace one with another.
  • Partitions can be empty.
  • It looks like you're passing dictionaries. Remember that base Python dictionary is unordered (unlike collections.OrderedDict for example). So passing columns may not work as expected.
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1

这篇关于如何在保留现有架构的情况下按行创建DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆