Pyspark:将tar.gz文件加载到数据框中并按文件名过滤 [英] Pyspark: Load a tar.gz file into a dataframe and filter by filename

查看:86
本文介绍了Pyspark:将tar.gz文件加载到数据框中并按文件名过滤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个tar.gz文件,其中包含多个文件.层次结构如下所示.我的意图是读取tar.gz文件,过滤掉 b.tsv 的内容,因为它是静态元数据,而其他所有文件都是实际记录.

I have a tar.gz file that has multiple files. The hierarchy looks as below. My intention is to read the tar.gz file, filter out the contents of b.tsv as it is static metadata where all the other files are actual records.

gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.

通过pyspark加载,我可以将文件加载到数据帧中.我使用了命令:

By pyspark load, I'm able to load the file into a dataframe. I used the command:

spark = SparkSession.\
        builder.\
        appName("Loading Gzip Files").\
        getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
          format='com.databricks.spark.csv',\
          sep = '\t'

出于过滤的目的,我添加了文件名

With the intention to filter, I added the filename

from  pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())

现在像这样生成数据:

|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar  netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|

当然,文件字段中填充了tar.gz文件,使该方法无用.一个更令人烦恼的问题是,_c0填充了文件名 + 垃圾 + 第一行值

Of course, the file field is populating with the tar.gz file, making that approach useless. A more irritating problem is, the _c0 is getting populated with filename+garbage+first row values

在这一点上,我想知道文件读取本身是否变得奇怪,因为它是tar.gz文件.当我们执行此处理的v1(火花0.9)时,我们又执行了一个步骤,将s3中的数据加载到ec2框中,提取并写回到s3中.我正在努力摆脱这些步骤.

At this point, I'm wondering if the file read itself is getting weird as it is a tar.gz file. When we did the v1 of this processing, (spark 0.9), we had another step that loaded the data from s3 into an ec2 box, extract and write back into s3. I'm trying to get rid of those steps.

提前谢谢!

推荐答案

Databricks 不支持直接的 *.tar.gz 迭代.为了处理文件,必须将它们解压缩到临时位置.Databricks支持 bash 不能胜任这项工作.

Databricks does not support direct *.tar.gz iteration. In order to process file, they have to be unzipped into temporary location. Databricks support bash than can do the job.

%sh find $source -name *.tar.gz -exec tar -xvzf {} -C $destination \;

以上代码会将源中扩展名为 *.tar.gz 的所有文件解压缩到目标位置.如果通过 dbutils.widgets %scala %pyspark 中的静态传递路径,则必须将该路径声明为环境变量.这可以在%pyspark

Above code will unzip all files with extension *.tar.gz in source to destination location. If the path is passed via dbutils.widgets or static in %scala or %pyspark, the path must be declared as environmental variable. This can be achieved in %pyspark

import os
os.environ[' source '] = '/dbfs/mnt/dl/raw/source/'

假设 *.csv 文件中的内容,请使用以下方法加载文件:

Use following methods to load file, in assumption the content in *.csv file:

DF = spark.read.format('csv').options(header='true', inferSchema='true').option("mode","DROPMALFORMED").load('/mnt/dl/raw/source/sample.csv')

这篇关于Pyspark:将tar.gz文件加载到数据框中并按文件名过滤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆