Spark - 使用 Firehose 从分区文件夹中读取 JSON [英] Spark - Reading JSON from Partitioned Folders using Firehose

查看:27
本文介绍了Spark - 使用 Firehose 从分区文件夹中读取 JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Kinesis firehose 管理文件的持久性,在这种情况下是时间序列 JSON,到一个文件夹层次结构中,该层次结构按 YYYY/MM/DD/HH(精确到 24 小时编号)进行分区......很棒.

如何使用 Spark 2.0 然后我可以读取这些嵌套的子文件夹并从所有叶 json 文件创建静态数据帧?数据帧阅读器有选项"吗?

我的下一个目标是使其成为流式 DF,其中 Firehose 保存到 s3 中的新文件自然地成为使用 Spark 2.0 中新结构化流式处理的流式数据帧的一部分.我知道这都是实验性的 - 希望有人之前曾使用 S3 作为流文件源,其中数据按上述方式分区到文件夹中.当然更喜欢直接使用 Kinesis 流,但此连接器上没有 2.0 的日期,因此 Firehose->S3 是临时的.

ND:我正在使用数据块,它将 S3 挂载到 DBFS,但当然很容易成为 EMR 或其他 Spark 提供商.如果一个笔记本是可共享的,并且给出了一个例子,那么看到一个笔记本也很高兴.

干杯!

解决方案

我可以读取嵌套的子文件夹并从所有叶 JSON 文件创建静态 DataFrame 吗?DataFrame 阅读器有选项吗?

是的,因为您的目录结构是常规的(YYYY/MM/DD/HH),您可以使用如下通配符指定直到叶子节点的路径

val spark: SparkSession = SparkSession.builder.master("local").getOrCreateval jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")//这里 */*/*/*/*.json 映射到 YYYY/MM/DD/HH/filename.json

<小时><块引用>

当然,更喜欢直接使用 Kinesis 流,但此连接器上没有 2.0 的日期,因此 Firehose->S3 是临时的.

我可以看到有一个用于 Kinesis 与 Spark Streaming 的集成.因此,您可以直接读取流数据并对其执行 SQL 操作,而无需从 S3 读取.

groupId = org.apache.sparkartifactId = spark-streaming-kinesis-asl_2.11版本 = 2.0.0

使用 Spark Streaming 和 SQL 的示例代码

import org.apache.spark.streaming.Duration导入 org.apache.spark.streaming.kinesis._导入 com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamval kinesisStream = KinesisUtils.createStream(streamContext、[Kinesis 应用程序名称]、[Kinesis 流名称]、[端点 URL]、[区域名称]、[初始位置]、[检查点间隔]、StorageLevel.MEMORY_AND_DISK_2)kinesisStream.foreachRDD { rdd =>//获取 SparkSession 的单例实例val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()导入 spark.implicits._//将 RDD[String] 转换为 DataFrameval jsonDf = rdd.toDF()//或 rdd.toDF("在此处指定架构/列")//创建一个带有 DataFrame 的临时视图jsonDf.createOrReplaceTempView("json_data_tbl")//因为我们有 DataFrame 和 SparkSession 对象,所以我们可以执行大多数//这里的Spark SQL内容}

Kinesis firehose manages the persistence of files, in this case time series JSON, into a folder hierarchy that is partitioned by YYYY/MM/DD/HH (down to the hour in 24 numbering)...great.

How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?

My next goal is for this to be a streaming DF, where new files persisted by Firehose into s3 naturally become part of the streaming dataframe using the new structured streaming in Spark 2.0. I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is paritioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

ND: I am using databricks, which mounts S3 into DBFS, but could easily be EMR of course or other Spark providers. Be great to see a notebook too if one is shareable that gives an example.

Cheers!

解决方案

Can I read nested subfolders and create a static DataFrame from all the leaf JSON files? Is there an option to the DataFrame reader?

Yes, as your directory structure is regular(YYYY/MM/DD/HH), you can give the path till leaf node with wildcard chars like below

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 


Of course, would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

I could see there is a library for Kinesis integration with Spark Streaming. So, you can read the streaming data directly and perform SQL operations on it without reading from S3.

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0

Sample code with Spark Streaming and SQL

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")

  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")

  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}

这篇关于Spark - 使用 Firehose 从分区文件夹中读取 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆