按日期从 Spark 中的 S3 读取多个文件 [英] Reading multiple files from S3 in Spark by date period

查看:35
本文介绍了按日期从 Spark 中的 S3 读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,它将数据发送到 AWS Kinesis Firehose 并将数据写入我的 S3 存储桶.Firehose 使用yyyy/MM/dd/HH"格式写入文件.

I have an application, which sends data to AWS Kinesis Firehose and this writes the data into my S3 bucket. Firehose uses "yyyy/MM/dd/HH" format to write the files.

就像这个示例 S3 路径:

Like in this sample S3 path:

s3://mybucket/2016/07/29/12

现在我有一个用 Scala 编写的 Spark 应用程序,我需要在其中读取特定时间段的数据.我有开始和结束日期.数据采用 JSON 格式,这就是为什么我使用 sqlContext.read.json() 而不是 sc.textFile().

Now I have a Spark application written in Scala, where I need to read data from a specific time period. I have start and end dates. The data is in JSON format and that's why I use sqlContext.read.json() not sc.textFile().

如何快速有效地读取数据?

How can I read the data quickly and efficiently?

  1. 通配符 - 我可以选择特定日期的所有小时或特定月份的所有日期的数据,例如:

  1. Wildcards - I can select the data from all hours of a specific date or all dates of a specific month, for example:

val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

但是如果我必须从几天的日期期间读取数据,例如 2016-07-29 - 2016-07-30 我不能以相同的方式使用通配符方法.

But if I have to read data from the date period of a few days, for example 2016-07-29 - 2016-07-30 I cannot use the wildcard approach in the same way.

这让我想到了我的下一点......

Which brings me to my next point...

联合 - cloud 上一个链接中的第二种解决方案建议分别读取每个目录,然后将它们联合在一起.尽管他建议联合 RDD-s,但也可以选择联合 DataFrames.如果我手动从给定的日期期间生成日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取都会失败.相反,我可以使用 AWS SDK 并使用 AmazonS3Client 中的 listObjects 函数来获取所有密钥,就像上一个链接中 iMKanchwala 的解决方案一样.

Union - A second solution from the previous link by cloud suggests to read each directory separately and then union them together. Although he suggests unioning RDD-s, there's an option to union DataFrames as well. If I generate the date strings from given date period manually, then I may create a path that does not exist and instead of ignoring it, the whole reading fails. Instead I could use AWS SDK and use the function listObjects from AmazonS3Client to get all the keys like in iMKanchwala's solution from the previous link.

唯一的问题是我的数据在不断变化.如果 read.json() 函数将所有数据作为单个参数获取,它会读取所有必要的数据,并且足够智能地从数据中推断出 json 模式.如果我分别读取 2 个目录并且它们的架构不匹配,那么我认为合并这两个数据帧就会成为一个问题.

The only problem is that my data is constantly changing. If read.json() function gets all the data as a single parameter, it reads all the necessary data and is smart enough to infer the json schema from the data. If I read 2 directories separately and their schemas don't match, then I think unioning these two dataframes becomes a problem.

Glob(?) 语法 - nhahtdh 的这个解决方案比选项12要好一些,因为它们提供了更详细地指定日期和目录并作为单个路径"的选项,因此它也适用于 read.json().

Glob(?) syntax - This solution by nhahtdh is a little better than options 1 and 2 because they provide the option to specify dates and directories in more detail and as a single "path" so it works also with read.json().

但是,关于丢失的目录又出现了一个熟悉的问题.假设我想要从 20.07 到 30.07 的所有数据,我可以这样声明:

But again, a familiar problem occurs about the missing directories. Let's say I want all the data from 20.07 to 30.07, I can declare it like this:

val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

但是如果我丢失了 7 月 25 日的数据,那么路径 ..16/07/25/ 不存在并且整个函数失败.

But if I am missing data from let's say 25th of July, then the path ..16/07/25/ does not exist and the whole function fails.

显然,当请求的时间段是 25.11.2015-12.02.2016 时,它会变得更加困难,那么我需要以编程方式(在我的 Scala 脚本中)创建一个像这样的字符串路径:

And obviously it gets more difficult when the requested period is for example 25.11.2015-12.02.2016, then I would need to programmatically (in my Scala script) create a string path something like this:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

通过创建它,我需要以某种方式确保这些 25-30 和 01-12 间隔都有相应的路径,如果缺少一个,它会再次失败.(幸运的是,Asterisk 可以处理丢失的目录,因为它会读取存在的所有内容)

And by creating it, I would neet to somehow be sure that these 25-30 and 01-12 intervals all have corresponding paths, if one is missing, it fails again. (Asterisk fortunately deals with missing directories, as it reads everything that exists)

如何一次读取单个目录路径中的所有必要数据,而不会因为某个日期间隔之间缺少目录而失败?

推荐答案

有一个更简单的解决方案.如果您查看 DataFrameReader API 你会注意到有一个 .json(paths: String*) 方法.只需根据您的喜好构建您想要的路径的集合,使用不带全局变量,然后调用该方法,例如,

There is a much simpler solution. If you look at the DataFrameReader API you'll notice that there is a .json(paths: String*) method. Just build a collection of the paths you want, with globs of not, as you prefer, and then call the method, e.g.,

val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)

这篇关于按日期从 Spark 中的 S3 读取多个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆