使用日期范围对分区数据进行 Spark SQL 查询 [英] Spark SQL queries on partitioned data using Date Ranges

查看:51
本文介绍了使用日期范围对分区数据进行 Spark SQL 查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的数据集是这样分区的:

My dataset is partitioned in this way:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

在两个日期之间加载数据的 spark 中创建数据框的最简单有效的方法是什么?

What is the easiest and efficient way to create a dataframe in spark loaded with data between two dates?

推荐答案

如果你非要坚持这种分区策略,答案取决于你是否愿意承担分区发现成本.

If you absolutely have to stick to this partitioning strategy, the answer depends on whether you are willing to bear partition discovery costs or not.

如果您愿意让 Spark 发现所有分区,这只需发生一次(直到您添加新文件),您可以加载基本路径,然后使用分区列进行过滤.

If you are willing to have Spark discover all partitions, which only needs to happen once (until you add new files), you can load the basepath and then filter using the partition columns.

如果您不希望 Spark 发现所有分区,例如,因为您有数百万个文件,那么唯一有效的通用解决方案是将您要查询的间隔分成几个您可以轻松查询使用的子间隔@r0bb23 的方法,然后结合在一起.

If you do not want Spark to discover all the partitions, e.g., because you have millions of files, the only efficient general solution is to break the interval you want to query for into several sub-intervals you can easily query for using @r0bb23's approach and then union then together.

如果您想要上述两种情况的最佳选择并且您有一个稳定的架构,您可以通过定义一个外部分区表在 Metastore 中注册分区.如果您希望您的架构随着 Metastore 管理的表在此时管理架构的演变非常糟糕,请不要这样做.

If you want the best of both cases above and you have a stable schema, you can register the partitions in the metastore by defining an external partitioned table. Don't do this if you expect your schema to evolve as metastore-managed tables manage schema evolution quite poorly at this time.

例如,要在 2017-10-062017-11-03 之间进行查询,您可以:

For example, to query between 2017-10-06 and 2017-11-03 you'd do:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

为此编写通用代码当然是可能的,但我还没有遇到过.更好的方法是按照我对问题发表的评论中概述的方式进行分区.如果你的表是使用类似 /basepath/ts=yyyymmddhhmm/*.parquet 的东西分区的,那么答案很简单:

Writing generic code for this is certainly possible but I haven't encountered it. The better approach is to partition in the manner outlined in the comment I made to the question. If your table was partitioned using something like /basepath/ts=yyyymmddhhmm/*.parquet then the answer is simply:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

值得增加小时数的原因分钟是您然后可以编写处理间隔的通用代码,无论您是否按周、日、小时或每 15 分钟对数据进行分区.事实上,您甚至可以在同一个表中管理不同粒度的数据,例如,将旧数据聚合到更高级别以减少需要发现的分区总数.

The reason why it's worth adding hours & minutes is that you can then write generic code that handles intervals regardless of whether you have the data partitioned by week, day, hour, or every 15 mins. In fact you can even manage data with different granularity in the same table, e.g., older data is aggregated at higher levels to reduce the total number of partitions that need to be discovered.

这篇关于使用日期范围对分区数据进行 Spark SQL 查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆