如何使用PySpark将许多csv文件(大约130,000个)合并到一个大型数据集中? [英] How can I merge these many csv files (around 130,000) using PySpark into one large dataset efficiently?

查看:855
本文介绍了如何使用PySpark将许多csv文件(大约130,000个)合并到一个大型数据集中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我早些时候发布了这个问题,并获得了一些使用PySpark的建议.

I posted this question earlier and got some advice to use PySpark instead.

我如何将其合并大数据集有效地变成一个大数据框?

以下zip文件( https://fred.stlouisfed.org/Categories/32263/downloaddata/INTRNTL_csv_2.zip )包含一个名为data的文件夹,其中包含约130,000个csv文件.我想将所有这些合并到一个数据框中.我有16gb的RAM,当我碰到前几百个文件时,我的RAM一直用光.文件的总大小只有大约300-400 mb的数据.

The following zip file (https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip) contains a folder called data with around 130,000 of csv files. I want to merge all of them into one single dataframe. I have 16gb of RAM and I keep running out of RAM when I hit the first few hundred files. The files' total size is only about 300-400 mb of data.

如果打开任何一个csv文件,您会看到它们都具有相同的格式,第一列用于日期,第二列用于数据系列.

If you open up any of the csv files, you can see that they all have the same format, the first column is for dates, and the second column is for the data series.

所以现在我改为使用PySpark,但是我不知道用pandas数据帧连接所有文件的最有效方法是什么,我只是像这样合并单个帧的列表,因为我希望它们合并到日期:

So now instead I am using PySpark, however I have no idea what is the most efficient way to connect all the files, with pandas dataframes I would just concat the list of individual frames like this because I want them to merge on the dates:

bigframe = pd.concat(listofframes,join='outer', axis=0)

但是就像我提到的那样,这种方法不起作用,因为我很快就用完了RAM.

But like I mentioned, this method doesn't work as I run out of RAM really fast.

使用PySpark做类似事情的最好方法是什么?

What would be the best way to do something similar using PySpark?

到目前为止,我已经知道了(顺便说一下,下面的文件列表只是我要拉出的文件的列表,您可以忽略它)

So far I have this, (by the way the filelist below is just a list of the files which I want to pull out, you can ignore that)


import os

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col

from functools import reduce
from pyspark.sql import DataFrame

listdf = []

for subdir, dirs, files in os.walk("/kaggle/input/filelist/"):
    for file in files:
        path = os.path.join(subdir,file)
        print(file)
        filelist = pd.read_excel("/kaggle/input/filelist/" + file)

        for row in filelist.File.items():
            df = spark.read.csv(f"/kaggle/input/master/{file[:-5]}/{file[:-5]}/data/" + row[1], inferSchema = True, header = True)
            df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
            df.show(3)
            listdf.append(df)

我在将代码追加到10帧后停止了代码, 但是,当我尝试下面的代码时,它只有一列数据,无法正确合并.

I stop the code after it appends like 10 frames, But when I try the code below, it just has one column of data, it doesn't merge properly.

bigframe = reduce(DataFrame.join(listdf, ['DATE'], how='full'))

但是我只剩下两列数据,即日期和火花框列表中的第一项.

But I am only left with 2 columns of data, the date and the first item in the list of spark frames.

如何将所有内容正确合并到一帧中?我希望日期成为其他列合并的事物索引.表示一帧是否具有:

How do I merge everything into one frame properly? I want the Dates to be the thing index that the other columns merge on. Meaning if one frame has:

Date        TimeSeries1
1 Jan 2012  12345
2 Jan 2012  23456

而另一个拥有

Date        TimeSeries2
1 Jan 2012  5678
3 Jan 2012  8910

我希望最终的产品是

Date        TimeSeries1 TimeSeries2
1 Jan 2012  12345       5678
2 Jan 2012  23456
3 Jan 2012              8910

此外,要标识列,必须将名称更改为文件名.

Also, to identify the columns, the names have to be changed to the name of the file.

推荐答案

这里发生了很多事情,但是如果我可以将其提炼为需要将130k CSV文件中的数据合并到单个DF中并捕获每个文件的名称,您可以这样操作.

There is a lot of stuff happening here, but if I can distill this to the need to merge data from 130k CSV files into one single DF, and capture the name for each file, you can do it like this.

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])

fullpath = 'mnt/INTRNTL_csv_2/data/??/*.csv'

df = spark.read.format("csv") \
   .option("header", "false") \
   .option("sep","|") \
   .schema(customSchema) \
   .load(fullPath) \
   .withColumn("filename", input_file_name())

注意:代码的第一行和最后一行用于获取文件名.另外,请注意通配符;这 '?'表示单个字符(字母或数字),"*"表示任意数量的字符(字母和数字的任意组合).

Notice: The very first line of code and the very last line of code are used to get the file names. Also, pay attention to the wildcards; the '?' is for one single character (either a letter or a number) and the '*' is for any number of characters (any combination of letters and numbers).

这篇关于如何使用PySpark将许多csv文件(大约130,000个)合并到一个大型数据集中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆