在Azure Databricks中读取日期范围之间的镶木地板文件的有效方法 [英] Efficient way of reading parquet files between a date range in Azure Databricks

查看:65
本文介绍了在Azure Databricks中读取日期范围之间的镶木地板文件的有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道下面的伪代码是否是从PySpark(Azure Databricks)读取存储在Azure Data Lake中的日期范围之间的多个实木复合地板文件的有效方法.注意:实木复合地板文件不会按日期分区.

我正在使用uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet约定将数据存储在ADL中,如Nathan Marz在《大数据》一书中所建议的那样,做了一些修改(使用2019而不是year = 2019).>

使用*通配符读取所有数据:

  df = spark.read.parquet(uat/EntityName/*/*/*/*) 

添加一列FileTimestamp,该列使用字符串操作并转换为TimestampType(),从EntityName_2019_01_01_HHMMSS.parquet中提取时间戳.

  df.withColumn(添加时间戳列) 

使用过滤器获取相关数据:

  start_date ='2018-12-15 00:00:00'end_date ='2019-02-15 00:00:00'df.filter(df.FileTimestamp> =开始日期).filter(df.FileTimestamp<结束日期) 

基本上,我正在使用PySpark来模拟U-SQL中可用的简洁语法:

  @rs =提炼用户字符串,ID字符串,__date DateTime从"/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"使用Extractors.Csv();@rs =选择 *来自@rs在哪里日期> = System.DateTime.Parse("2016/1/1")和日期<System.DateTime.Parse("2016/2/1"); 

解决方案

对数据进行分区的正确方法是在数据上使用year = 2019,month = 01等形式.

使用诸如以下的过滤器查询此数据时:

  df.filter(df.year> = myYear) 

然后,Spark将只读取相关的文件夹.

非常重要的一点是,过滤列名称必须准确显示在文件夹名称中.请注意,当您使用Spark写入分区数据(例如按年,月,日)时,它不会将分区列写入镶木地板文件中.相反,它们是从路径中推断出来的.的确确实意味着您的数据框在编写时将需要它们.当您从分区源中读取它们时,它们也将作为列返回.

如果您不能更改文件夹结构,则可以始终手动减少文件夹以供使用正则表达式或Glob读取Spark-本文应提供更多上下文我可以从S3中将多个文件读入Spark Dataframe中,并通过不存在的文件吗?

也来自"Spark-权威指南:大数据处理变得简单"比尔·钱伯斯(Bill Chambers):

分区是一种工具,可让您控制存储哪些数据(以及写在哪里).当您将文件写入分区时目录(或表)中,您基本上将一列编码为文件夹.什么这使您可以做的是在读入数据时跳过大量数据以后,仅允许您读入与您的问题相关的数据无需扫描完整的数据集....

这可能是在以下情况下可以使用的最低要求的优化您有一个表,读者经常在此表之前进行过滤操纵.例如,日期对于分区,因为在下游,我们通常只希望查看前一周的数据(而不是扫描整个记录列表).

I would like to know if below pseudo code is efficient method to read multiple parquet files between a date range stored in Azure Data Lake from PySpark(Azure Databricks). Note: the parquet files are not partitioned by date.

Im using uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet convention for storing data in ADL as suggested in the book Big Data by Nathan Marz with slight modification(using 2019 instead of year=2019).

Read all data using * wildcard:

df = spark.read.parquet(uat/EntityName/*/*/*/*)

Add a Column FileTimestamp that extracts timestamp from EntityName_2019_01_01_HHMMSS.parquet using string operation and converting to TimestampType()

df.withColumn(add timestamp column)

Use filter to get relevant data:

start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)

Essentially I'm using PySpark to simulate the neat syntax available in U-SQL:

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");

解决方案

The correct way of partitioning out your data is to use the form year=2019, month=01 etc on your data.

When you query this data with a filter such as:

df.filter(df.year >= myYear)

Then Spark will only read the relevant folders.

It is very important that the filtering column name appears exactly in the folder name. Note that when you write partitioned data using Spark (for example by year, month, day) it will not write the partitioning columns into the parquet file. They are instead inferred from the path. It does mean your dataframe will require them when writing though. They will also be returned as columns when you read from partitioned sources.

If you cannot change the folder structure you can always manually reduce the folders for Spark to read using a regex or Glob - this article should provide more context Spark SQL queries on partitioned data using Date Ranges. But clearly this is more manual and complex.

UPDATE: Further example Can I read multiple files into a Spark Dataframe from S3, passing over nonexistent ones?

Also from "Spark - The Definitive Guide: Big Data Processing Made Simple" by Bill Chambers:

Partitioning is a tool that allows you to control what data is stored (and where) as you write it. When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset. ...

This is probably the lowest-hanging optimization that you can use when you have a table that readers frequently filter by before manipulating. For instance, date is particularly common for a partition because, downstream, often we want to look at only the previous week’s data (instead of scanning the entire list of records).

这篇关于在Azure Databricks中读取日期范围之间的镶木地板文件的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆