如何在保留现有架构的同时从行中创建 DataFrame? [英] How to create a DataFrame out of rows while retaining existing schema?

查看:25
本文介绍了如何在保留现有架构的同时从行中创建 DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我调用 map 或 mapPartition 并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?组合行并保留模式的东西?

If I call map or mapPartition and my function receives rows from PySpark what is the natural way to create either a local PySpark or Pandas DataFrame? Something that combines the rows and retains the schema?

目前我做的事情是:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)

推荐答案

Spark >= 2.3.0

从 Spark 2.3.0 开始,可以按分区或组使用 Pandas SeriesDataFrame.参见示例:

Since Spark 2.3.0 it is possible to use Pandas Series or DataFrame by partition or group. See for example:

火花<2.3.0

创建本地 PySpark 的自然方式是什么

what is the natural way to create either a local PySpark

没有这种东西.Spark 分布式数据结构不能嵌套,或者您更喜欢无法嵌套操作或转换的另一个角度.

There is no such thing. Spark distributed data structures cannot be nested or you prefer another perspective you cannot nest actions or transformations.

或 Pandas DataFrame

or Pandas DataFrame

这相对容易,但您必须至少记住以下几点:

It is relatively easy but you have to remember at least few things:

  • Pandas 和 Spark DataFrames 甚至远非等价.这些是不同的结构,具有不同的属性,通常您不能用另一种替换.
  • 分区可以为空.
  • 您好像在传递字典.请记住,基础 Python 字典是无序的(例如,与 collections.OrderedDict 不同).因此,传递列可能无法按预期工作.
  • Pandas and Spark DataFrames are not even remotely equivalent. These are different structures, with different properties and in general you cannot replace one with another.
  • Partitions can be empty.
  • It looks like you're passing dictionaries. Remember that base Python dictionary is unordered (unlike collections.OrderedDict for example). So passing columns may not work as expected.
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1

这篇关于如何在保留现有架构的同时从行中创建 DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆