Pyspark:将tar.gz文件加载到数据框中并按文件名过滤 [英] Pyspark: Load a tar.gz file into a dataframe and filter by filename
问题描述
我有一个tar.gz文件,其中包含多个文件.层次结构如下所示.我的意图是读取tar.gz文件,过滤掉 b.tsv
的内容,因为它是静态元数据,而其他所有文件都是实际记录.
I have a tar.gz file that has multiple files. The hierarchy looks as below. My intention is to read the tar.gz file, filter out the contents of b.tsv
as it is static metadata where all the other files are actual records.
gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.
通过pyspark加载,我可以将文件加载到数据帧中.我使用了命令:
By pyspark load, I'm able to load the file into a dataframe. I used the command:
spark = SparkSession.\
builder.\
appName("Loading Gzip Files").\
getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
format='com.databricks.spark.csv',\
sep = '\t'
出于过滤的目的,我添加了文件名
With the intention to filter, I added the filename
from pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())
现在像这样生成数据:
|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|
当然,文件字段中填充了tar.gz文件,使该方法无用.一个更令人烦恼的问题是,_c0填充了文件名
+ 垃圾
+ 第一行值
Of course, the file field is populating with the tar.gz file, making that approach useless.
A more irritating problem is, the _c0 is getting populated with filename
+garbage
+first row values
在这一点上,我想知道文件读取本身是否变得奇怪,因为它是tar.gz文件.当我们执行此处理的v1(火花0.9)时,我们又执行了一个步骤,将s3中的数据加载到ec2框中,提取并写回到s3中.我正在努力摆脱这些步骤.
At this point, I'm wondering if the file read itself is getting weird as it is a tar.gz file. When we did the v1 of this processing, (spark 0.9), we had another step that loaded the data from s3 into an ec2 box, extract and write back into s3. I'm trying to get rid of those steps.
提前谢谢!
推荐答案
Databricks 不支持直接的 *.tar.gz 迭代.为了处理文件,必须将它们解压缩到临时位置.Databricks支持 bash 不能胜任这项工作.
Databricks does not support direct *.tar.gz iteration. In order to process file, they have to be unzipped into temporary location. Databricks support bash than can do the job.
%sh find $source -name *.tar.gz -exec tar -xvzf {} -C $destination \;
以上代码会将源中扩展名为 *.tar.gz 的所有文件解压缩到目标位置.如果通过 dbutils.widgets 或%scala 或%pyspark 中的静态传递路径,则必须将该路径声明为环境变量.这可以在%pyspark
Above code will unzip all files with extension *.tar.gz in source to destination location. If the path is passed via dbutils.widgets or static in %scala or %pyspark, the path must be declared as environmental variable. This can be achieved in %pyspark
import os
os.environ[' source '] = '/dbfs/mnt/dl/raw/source/'
假设 *.csv 文件中的内容,请使用以下方法加载文件:
Use following methods to load file, in assumption the content in *.csv file:
DF = spark.read.format('csv').options(header='true', inferSchema='true').option("mode","DROPMALFORMED").load('/mnt/dl/raw/source/sample.csv')
这篇关于Pyspark:将tar.gz文件加载到数据框中并按文件名过滤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!