如何在保留现有架构的同时从行中创建 DataFrame? [英] How to create a DataFrame out of rows while retaining existing schema?
问题描述
如果我调用 map 或 mapPartition
并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?组合行并保留模式的东西?
If I call map or mapPartition
and my function receives rows from PySpark what is the natural way to create either a local PySpark or Pandas DataFrame? Something that combines the rows and retains the schema?
目前我做的事情是:
def combine(partition):
rows = [x for x in partition]
dfpart = pd.DataFrame(rows,columns=rows[0].keys())
pandafunc(dfpart)
mydf.mapPartition(combine)
推荐答案
Spark >= 2.3.0
从 Spark 2.3.0 开始,可以按分区或组使用 Pandas Series
或 DataFrame
.参见示例:
Since Spark 2.3.0 it is possible to use Pandas Series
or DataFrame
by partition or group. See for example:
火花<2.3.0
创建本地 PySpark 的自然方式是什么
what is the natural way to create either a local PySpark
没有这种东西.Spark 分布式数据结构不能嵌套,或者您更喜欢无法嵌套操作或转换的另一个角度.
There is no such thing. Spark distributed data structures cannot be nested or you prefer another perspective you cannot nest actions or transformations.
或 Pandas DataFrame
or Pandas DataFrame
这相对容易,但您必须至少记住以下几点:
It is relatively easy but you have to remember at least few things:
- Pandas 和 Spark DataFrames 甚至远非等价.这些是不同的结构,具有不同的属性,通常您不能用另一种替换.
- 分区可以为空.
- 您好像在传递字典.请记住,基础 Python 字典是无序的(例如,与
collections.OrderedDict
不同).因此,传递列可能无法按预期工作.
- Pandas and Spark DataFrames are not even remotely equivalent. These are different structures, with different properties and in general you cannot replace one with another.
- Partitions can be empty.
- It looks like you're passing dictionaries. Remember that base Python dictionary is unordered (unlike
collections.OrderedDict
for example). So passing columns may not work as expected.
import pandas as pd
rdd = sc.parallelize([
{"x": 1, "y": -1},
{"x": -3, "y": 0},
{"x": -0, "y": 4}
])
def combine(iter):
rows = list(iter)
return [pd.DataFrame(rows)] if rows else []
rdd.mapPartitions(combine).first()
## x y
## 0 1 -1
这篇关于如何在保留现有架构的同时从行中创建 DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!