按日期周期从Spark中的S3读取多个文件 [英] Reading multiple files from S3 in Spark by date period

查看:573
本文介绍了按日期周期从Spark中的S3读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,该应用程序将数据发送到AWS Kinesis Firehose,并将数据写入我的S3存储桶. Firehose使用"yyyy/MM/dd/HH"格式写入文件.

I have an application, which sends data to AWS Kinesis Firehose and this writes the data into my S3 bucket. Firehose uses "yyyy/MM/dd/HH" format to write the files.

类似于此示例S3路径:

Like in this sample S3 path:

s3://mybucket/2016/07/29/12

现在我有一个用Scala编写的Spark应用程序,需要从特定时间段读取数据.我有开始和结束日期.数据为JSON格式,这就是为什么我使用sqlContext.read.json()而不是sc.textFile()的原因.

Now I have a Spark application written in Scala, where I need to read data from a specific time period. I have start and end dates. The data is in JSON format and that's why I use sqlContext.read.json() not sc.textFile().

如何快速有效地读取数据?

How can I read the data quickly and efficiently?

  1. 通配符-我可以从特定日期的所有小时或特定月份的所有日期中选择数据,例如:

  1. Wildcards - I can select the data from all hours of a specific date or all dates of a specific month, for example:

val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

但是如果我必须从几天的日期中读取数据,例如2016-07-29-2016-07-30,则不能以相同的方式使用通配符方法.

But if I have to read data from the date period of a few days, for example 2016-07-29 - 2016-07-30 I cannot use the wildcard approach in the same way.

这带我到下一个要点...

Which brings me to my next point...

Union - cloud 上一个链接的第二种解决方案建议分别读取每个目录,然后将它们合并在一起.尽管他建议合并RDD-s,但也可以选择合并DataFrames.如果我手动根据给定的日期周期生成日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取都会失败.相反,我可以使用AWS开发工具包并使用AmazonS3Client中的功能listObjects从上一个链接获取所有 iMKanchwala 解决方案中的键.

Union - A second solution from the previous link by cloud suggests to read each directory separately and then union them together. Although he suggests unioning RDD-s, there's an option to union DataFrames as well. If I generate the date strings from given date period manually, then I may create a path that does not exist and instead of ignoring it, the whole reading fails. Instead I could use AWS SDK and use the function listObjects from AmazonS3Client to get all the keys like in iMKanchwala's solution from the previous link.

唯一的问题是我的数据在不断变化.如果read.json()函数将所有数据作为单个参数获取,它将读取所有必需的数据,并且足够聪明,可以从数据中推断出json模式.如果我分别读取2个目录并且它们的架构不匹配,那么我认为将这两个数据帧合并将成为问题.

The only problem is that my data is constantly changing. If read.json() function gets all the data as a single parameter, it reads all the necessary data and is smart enough to infer the json schema from the data. If I read 2 directories separately and their schemas don't match, then I think unioning these two dataframes becomes a problem.

Glob(?)语法-read.json()一起使用.

Glob(?) syntax - This solution by nhahtdh is a little better than options 1 and 2 because they provide the option to specify dates and directories in more detail and as a single "path" so it works also with read.json().

但是同样,关于缺少的目录发生了一个熟悉的问题.假设我想要从20.07到30.07的所有数据,可以这样声明:

But again, a familiar problem occurs about the missing directories. Let's say I want all the data from 20.07 to 30.07, I can declare it like this:

val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

但是,如果我缺少7月25日的数据,那么路径..16/07/25/不存在,整个功能就会失败.

But if I am missing data from let's say 25th of July, then the path ..16/07/25/ does not exist and the whole function fails.

显然,当请求的时间段是例如25.11.2015-12.02.2016时,这会变得更加困难,那么我需要以编程方式(在我的Scala脚本中)创建类似以下的字符串路径:

And obviously it gets more difficult when the requested period is for example 25.11.2015-12.02.2016, then I would need to programmatically (in my Scala script) create a string path something like this:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

通过创建它,我需要以某种方式确保这些25-30和01-12间隔都具有相应的路径,如果缺少一个路径,它将再次失败. (幸运的是,星号会处理丢失的目录,因为它会读取所有存在的内容)

And by creating it, I would neet to somehow be sure that these 25-30 and 01-12 intervals all have corresponding paths, if one is missing, it fails again. (Asterisk fortunately deals with missing directories, as it reads everything that exists)

如何才能一次从一个目录路径中读取所有必要的数据,而不会由于某个日期间隔之间的目录丢失而导致失败?

推荐答案

有一个简单得多的解决方案.如果您查看您会注意到DataFrameReader API 中有一个.json(paths: String*)方法.只需构建一个所需路径的集合即可,然后根据需要调用该路径,而不用添加glob,然后调用该方法,例如,

There is a much simpler solution. If you look at the DataFrameReader API you'll notice that there is a .json(paths: String*) method. Just build a collection of the paths you want, with globs of not, as you prefer, and then call the method, e.g.,

val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)

这篇关于按日期周期从Spark中的S3读取多个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆