如何将巨大的 pandas 数据帧保存到hdfs? [英] How to save a huge pandas dataframe to hdfs?
问题描述
我正在使用熊猫和Spark数据框.数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小.目前,我将我的pandas数据框转换为spark数据框,如下所示:
Im working with pandas and with spark dataframes. The dataframes are always very big (> 20 GB) and the standard spark functions are not sufficient for those sizes. Currently im converting my pandas dataframe to a spark dataframe like this:
dataframe = spark.createDataFrame(pandas_dataframe)
我之所以进行这种转换,是因为通过火花将数据帧写入hdfs非常容易:
I do that transformation because with spark writing dataframes to hdfs is very easy:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
但是,对于大于2 GB的数据帧,转换失败. 如果将spark数据框转换为熊猫,则可以使用pyarrow:
But the transformation is failing for dataframes which are bigger than 2 GB. If I transform a spark dataframe to pandas I can use pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧.我还找不到其他方法可以做到这一点.意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花.问题是我真的找不到如何将熊猫数据帧写入hdfs.
This is a fast converstion from spark to pandas and it also works for dataframes bigger than 2 GB. I yet could not find a way to do it the other way around. Meaning having a pandas dataframe which I transform to spark with the help of pyarrow. The problem is that I really cant find how to write a pandas dataframe to hdfs.
我的熊猫版本:0.19.0
My pandas version: 0.19.0
推荐答案
意味着具有pandas数据框,我在pyarrow的帮助下将其转换为火花.
Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.
pyarrow.Table.fromPandas
是您要寻找的功能:
pyarrow.Table.fromPandas
is the function your looking for:
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)
Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
结果可以直接写入Parquet/HDFS,而无需通过Spark传递数据:
The result can be written directly to Parquet / HDFS without passing data via Spark:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
另请参见
- @WesMcKinney answer 到使用PyArrow从HDFS中读取实木复合地板文件.
- 读写
https://arrow.apache.org/docs/python/index.html"rel =" noreferrer> .pyarrow
文档 - Python中的本地Hadoop文件系统(HDFS)连接
- @WesMcKinney answer to read a parquet files from HDFS using PyArrow.
- Reading and Writing the Apache Parquet Format in the
pyarrow
documentation. - Native Hadoop file system (HDFS) connectivity in Python
火花笔记:
此外,自createDataFrame
( SPARK直接支持Spark 2.3(当前主版本)Arrow以来, -20791-使用Apache Arrow从Pandas.DataFrame 改进Spark createDataFrame.它使用SparkContext.defaultParallelism
计算块数,因此您可以轻松控制单个批次的大小.
Furthermore since Spark 2.3 (current master) Arrow is supported directly in createDataFrame
(SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame). It uses SparkContext.defaultParallelism
to compute number of chunks so you can easily control the size of individual batches.
最后,defaultParallelism
可用于控制使用标准_convert_from_pandas
生成的分区的数量,从而将切片的大小有效地减小到更易于管理的水平.
Finally defaultParallelism
can be used to control number of partitions generated using standard _convert_from_pandas
, effectively reducing size of the slices to something more manageable.
不幸的是,这些不太可能解决您的当前的内存问题.两者都依赖于parallelize
,因此将所有数据存储在驱动程序节点的内存中.切换到Arrow或调整配置只能加快过程或地址块大小的限制.
Unfortunately these are unlikely to resolve your current memory problems. Both depend on parallelize
, therefore store all data in memory of the driver node. Switching to Arrow or adjusting configuration can only speedup the process or address block size limitations.
实际上,只要您使用本地熊猫DataFrame
作为输入,我看不出有任何理由在此处切换到Spark.在这种情况下,最严重的瓶颈是驱动程序的网络I/O,而分发数据将无法解决该问题.
In practice I don't see any reason to switch to Spark here, as long as you use local Pandas DataFrame
as the input. The most severe bottleneck in this scenario is driver's network I/O and distributing data won't address that.
这篇关于如何将巨大的 pandas 数据帧保存到hdfs?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!