Pyspark将多个csv文件读取到一个数据帧中(OR RDD?) [英] Pyspark read multiple csv files into a dataframe (OR RDD?)

查看:629
本文介绍了Pyspark将多个csv文件读取到一个数据帧中(OR RDD?)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark 2.0.2集群,该集群是通过Jupyter Notebook通过Pyspark命中的.我有多个管道分隔的txt文件(已加载到HDFS中,但也可以在本地目录中找到),我需要使用spark-csv加载到三个单独的数据帧中,具体取决于文件名.

I've got a Spark 2.0.2 cluster that I'm hitting via Pyspark through Jupyter Notebook. I have multiple pipe delimited txt files (loaded into HDFS. but also available on a local directory) that I need to load using spark-csv into three separate dataframes, depending on the name of the file.

我看到了我可以采取的三种方法-要么我可以使用python以某种方式遍历HDFS目录(尚未弄清楚如何执行此操作,请加载每个文件,然后进行并集.

I see three approaches I can take - either I can use python to somehow iterate through the HDFS directory (haven't figured out how to do this yet, load each file and then do a union.

我还知道存在一些通配符功能(请参见

I also know that there exists some wildcard functionalty (see here) in spark - I can probably leverage

最后,我可以使用pandas作为磁盘数据框从磁盘加载香草csv文件,然后创建spark数据框.这里的缺点是这些文件很大,并且在单个节点上加载到内存中可能要花费约8gb. (这就是为什么它首先移到集群的原因.)

Lastly, I could use pandas to load the vanilla csv file from disk as a pandas dataframe and then create a spark dataframe. The downside here is that these files are large, and loading into memory on a single node could take ~8gb. (that's why this is moving to a cluster in the first place).

这是我到目前为止拥有的代码以及这两种方法的一些伪代码:

Here is the code I have so far and some pseudo code for the two methods:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

有人知道如何实现方法1或2吗?我还没弄清楚这些.另外,我很惊讶没有更好的方法将csv文件加载到pyspark数据框中-使用第三方程序包似乎应该是本机功能,这让我感到困惑(我只是错过了标准用例)将CSV文件加载到数据帧中?)最终,我将把合并的单个数据帧写回到HDFS(使用.write.parquet()),以便随后可以清除内存并使用MLlib进行一些分析.如果我强调的方法不是最佳做法,那么我将朝着正确的方向努力!

Does anyone know how to implement method 1 or 2? I haven't been able to figure these out. Also, I was surprised that there isn't a better way to get csv files loaded into a pyspark dataframe - using a third party package for something that seems like it should be a native feature confused me (did I just miss the standard use case for loading csv files into a dataframe?) Ultimately, I'm going to be writing a consolidated single dataframe back to HDFS (using .write.parquet() ) so that I can then clear the memory and do some analytics using MLlib. If the approach I've highlighted isn't best practice, I would appreciate a push in the right direction!

推荐答案

方法1:

在python中,您不能直接引用HDFS位置.您需要使用其他库(例如pydoop).在scala和Java中,您具有API.即使使用pydoop,您也将一一读取文件.很难一一读取文件并且不使用spark提供的并行读取选项.

In python you cannot directly refer to HDFS location. You need to take help of another library like pydoop. In scala and java, you have API. Even with pydoop, you will be reading the files one by one. It is bad to read files one by one and not use the parallel reading option provided by spark.

方法2:

您应该能够使用逗号分隔或使用通配符来指向多个文件.这样,spark可以读取文件并将它们分发到分区中.但是,如果对每个数据帧使用并集选项,则在动态读取每个文件时会出现一种边缘情况.当您有很多文件时,列表在驱动程序级别上可能会变得如此庞大,并可能导致内存问题.主要原因是,读取过程仍在驱动程序级别进行.

You should be able to point the multiple files with comma separated or with wild card. This way spark takes care of reading files and distribute them into partitions. But if you go with union option with each data frame there is one edge case when you dynamically read each file. When you have lot of files, the list can become so huge at driver level and can cause memory issues. Main reason is that, the read process is still happening at driver level.

此选项更好. Spark将读取与正则表达式相关的所有文件,并将其转换为分区.对于所有通配符匹配,您将获得一个RDD,并且从那里不必担心单个rdd的并集

This option is better. The spark will read all the files related to regex and convert them into partitions. You get one RDD for all the wildcard matches and from there you dont need to worry about union for individual rdd's

示例代码段:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

方法3:

除非您在python中有使用pandas功能的遗留应用程序,否则我最好使用spark提供的API

Unless you have some legacy application in python which uses the features of pandas, I would better prefer using spark provided API

这篇关于Pyspark将多个csv文件读取到一个数据帧中(OR RDD?)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆