从多个S3存储桶导入pyspark数据帧,其中一列表示该条目来自哪个存储桶 [英] Import pyspark dataframe from multiple S3 buckets, with a column denoting which bucket the entry came from
问题描述
我有一个按日期划分的S3存储桶列表.第一个存储桶的名称为2019-12-1,第二个存储桶的名称为2019-12-2,等等.
I have a list of S3 buckets partitioned by date. The first bucket titled 2019-12-1, the second 2019-12-2, etc.
每个存储桶都将我正在读取的实木复合地板文件存储到pyspark数据帧中.从每个存储桶生成的pyspark数据帧具有完全相同的架构.我想做的是遍历这些存储桶,并将所有这些镶木地板文件存储到单个pyspark数据帧中,该数据帧具有一个日期列,该日期列指示该数据帧中的每个条目实际来自哪个存储桶.
Each of these buckets stores parquet files that I am reading into a pyspark dataframe. The pyspark dataframe generated from each of these buckets has the exact same schema. What I would like to do is iterate over these buckets, and store all of these parquet files into a single pyspark dataframe that has a date column denoting what bucket each entry in the dataframe actually came from.
由于分别导入每个存储桶时生成的数据帧的架构深很多层(即,每一行包含结构数组的结构等),我想将所有存储桶组合到一个数据帧中的唯一方法是拥有一个带有单个日期"列的数据框. 日期"列的每一行将保存该日期对应的S3存储桶中的内容.
Because the schema of the dataframe generated when importing each bucket separately is many layers deep (i.e. each row contains structs of arrays of structs etc.), I imagine the only way to combine all the buckets into one dataframe is to have a dataframe with a single 'dates' column. Each row of the 'dates' column will hold the contents of the corresponding S3 bucket for that date.
我可以在此行中读取所有日期:
I can read all the dates with this line:
df = spark.read.parquet("s3://my_bucket/*")
我看到有人通过在此行上添加"withColumn"调用以创建日期"列来实现我所描述的功能,但我不记得该怎么做.
I've seen someone achieve what I'm describing by appending a 'withColumn' call to this line making a 'dates' column, but I can't remember how.
推荐答案
Using input_file_name()
you can extract the S3 bucket name from the file path:
df.withColumn("dates", split(regexp_replace(input_file_name(), "s3://", ""), "/").getItem(0))\
.show()
我们分割文件名,并获得与存储桶名称相对应的第一部分.
We split the filename and get the first part that corresponds to the bucket name.
这也可以使用正则表达式s3:\/\/(.+?)\/(.+)
完成,第一组是存储桶名称:
This can also be done using a regex s3:\/\/(.+?)\/(.+)
, the first group is the bucket name:
df.withColumn("dates", regexp_extract(input_file_name(), "s3:\/\/(.+?)\/(.+)", 1)).show()
这篇关于从多个S3存储桶导入pyspark数据帧,其中一列表示该条目来自哪个存储桶的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!