以编程方式为 Apache Spark 中的数据帧生成架构和数据 [英] Programmatically generate the schema AND the data for a dataframe in Apache Spark

查看:17
本文介绍了以编程方式为 Apache Spark 中的数据帧生成架构和数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想动态生成一个包含报告标题记录的数据框,因此根据以下字符串的值创建一个数据框:

val headerDescs : String = "Name,Age,Location"val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

但是现在我想对数据做同样的事情(实际上是相同的数据,即元数据).

我创建了一个 RDD :

val headerRDD = sc.parallelize(headerDescs.split(","))

然后我打算使用 createDataFrame 来创建它:

val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)

但是失败了,因为 createDataframe 需要 RDD[Row],但是我的 RDD 是一个字符串数组 - 我找不到转换我的 RDD 的方法到 Row RDD,然后动态映射字段.我见过的示例假设您事先知道列数,但是我希望最终能够在不更改代码的情况下更改列 - 例如将列放在文件中.

基于第一个答案的代码摘录:

val headerDescs : String = "Name,Age,Location"//从字符串创建模式,按分隔符分割val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))//从字符串创建一行,按分隔符分割val headerRDDRows = sc.parallelize(headerDescs.split(",")).map(a => Row(a))val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)headerDf.show()

执行此结果:

+--------+---+--------+|姓名|年龄|地点|+--------+---+--------+|姓名||年龄||位置|+--------+---+-------

解决方案

要将 RDD[Array[String]] 转换为 RDD[Row] 需要执行以下操作步骤:

import org.apache.spark.sql.Row

val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2))))标度>val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))标度>val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at :34标度>val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)headerDf:org.apache.spark.sql.DataFrame = [名称:字符串,年龄:字符串,位置:字符串]标度>headerDf.printSchema根|-- 名称:字符串(可为空 = 真)|-- 年龄:字符串(可为空 = 真)|-- 位置:字符串(可为空 = 真)标度>头文件显示+----+---+--------+|姓名|年龄|地点|+----+---+--------+|姓名|年龄|地点|+----+---+--------+

这会给你一个 RDD[Row]

<块引用>

用于读取文件

val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(一种))val headerDf = sqlContext.createDataFrame(vRDD, headerSchema)

<块引用>

使用 Spark-CSV 包:

 val df = sqlContext.read.format(com.databricks.spark.csv").option("header", "true")//使用所有文件的第一行作为标题.schema(headerSchema)//基于自定义模式定义.load(cars.csv")

val df = sqlContext.read.format(com.databricks.spark.csv").option("header", "true")//使用所有文件的第一行作为标题.option("inferSchema", "true")//自动推断数据类型.load(cars.csv")

您还可以在其文档中探索各种选项.

I would like to dynamically generate a dataframe containing a header record for a report, so creating a dataframe from the value of the string below:

val headerDescs : String = "Name,Age,Location"

val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

However now I want to do the same for the data (which is in effect the same data i.e. the metadata).

I create an RDD :

val headerRDD = sc.parallelize(headerDescs.split(","))

I then intended to use createDataFrame to create it:

val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)

however that fails because createDataframe is expecting a RDD[Row], however my RDD is an array of strings - I can't find a way of converting my RDD to a Row RDD and then mapping the fields dynamically. Examples I've seen assume you know the number of columns beforehand, however I want the ability eventually to be able to change the columns without changing the code - having the columns in a file for example.

Code excerpt based on first answer:

val headerDescs : String = "Name,Age,Location"

// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))

val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()

Executing this Results in:

+--------+---+--------+

|    Name|Age|Location|

+--------+---+--------+

|    Name|

|     Age|

|Location|

+--------+---+-------

解决方案

For converting RDD[Array[String]] to RDD[Row] you need to do following steps:

import org.apache.spark.sql.Row

val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))

scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))

scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34

scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]


scala> headerDf.printSchema
root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Location: string (nullable = true)



scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+

This would give you a RDD[Row]

For reading through file

val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
 
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)

Using Spark-CSV package :

 val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(headerSchema) // defining based on the custom schema
    .load("cars.csv")

OR

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

There are are various options also which you can explore in its documentation.

这篇关于以编程方式为 Apache Spark 中的数据帧生成架构和数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆