在Pyspark中读取tar.gz存档时使用特定模式过滤文件 [英] Filtering files using specific pattern when reading tar.gz archive in Pyspark

查看:91
本文介绍了在Pyspark中读取tar.gz存档时使用特定模式过滤文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的文件夹 myfolder.tar.gz 中有多个CSV文件.我以这种方式创建的方法:首先将我的所有文件放在文件夹名称 myfolder 中,然后为其准备一个 tar 文件夹.然后准备该tar文件夹的 .gz .

I have multiple CSV files in my folder myfolder.tar.gz. Which I created in this way: first put all my files in a folder name myfolder then prepare a tar folder of it. Then prepare .gz of that tar folder.

让我们说我们有5个文件.

Let us say we have 5 files.

abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv

我想使用Pyspark数据框以特定的文件名模式过滤读取的文件.就像我们要一起读取所有 abc 文件一样.

I want to filter read files in a specific filename pattern using Pyspark data frame. Like we want to read all abc files together.

这不应给我们 def 的结果,反之亦然.目前,我可以使用 spark.read.csv()函数一起读取所有CSV文件.另外,当我使用 pathGlobalFilter 这样的参数将文件保存在简单的文件夹中时,就可以过滤文件:

This should not give us the results from def and vice versa. Currently, I am able to read all the CSV files together by just using spark.read.csv() function. Also, I am able to filter file when I keep the files in a simple folder using pathGlobalFilter parameter like this:

df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")

但是当我能够在 tar.gz 中执行相同操作时,例如:

But when I am able to do the same in tar.gz, like:

df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")

我遇到错误:

无法推断CSV的架构.如何从.tar.gz文件读取.

Unable to infer Schema for CSV. How to read from .tar.gz file.

推荐答案

基于此帖子,您可以阅读 .tar.gz 文件作为 binaryFile ,然后使用python tarfile ,您可以提取存档成员并使用正则表达式 def_过滤文件名[1-9] .结果是rdd,您可以将其转换为数据帧:

Based on this post, you can read the .tar.gz file as binaryFile then using python tarfile you can extract the archive members and filter on file names using the regex def_[1-9]. The result is an rdd that you can convert into a data frame :

import re
import tarfile
from io import BytesIO

# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]

# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
        .mapValues(extract_files) \
        .flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
        .flatMap(lambda row: [e.split(",") for e in row])

df = rdd.toDF(*csv_cols)

这篇关于在Pyspark中读取tar.gz存档时使用特定模式过滤文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆