在星火转换为RDD数据框/斯卡拉 [英] Convert RDD to Dataframe in Spark/Scala
问题描述
该RDD以格式创建数组[数组[字符串]]
并具有以下值:
数组[阵列[字符串] =阵列(阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),阵列(4580056797, 0,2015年7月29日10时38分42秒,0,1,1),阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),阵列(4580057445,0, 2015年7月29日十点40分37秒,0,1,1),阵列(4580057445,0,2015年7月29日十点40分37秒,0,1,1))
我要创建一个模式的数据框:
VAL schemaString =CALLID oCallId callTime时间CALLTYPE SWID
接下来的步骤:
斯卡拉> VAL rowRDD = rdd.map(p值=>阵列(P(0),P(1)中,p(2),第(3)中,p(4),第(5).trim))
rowRDD:org.apache.spark.rdd.RDD [阵列[字符串] = MapPartitionsRDD [14]在地图AT<&控制台GT;:39
斯卡拉> VAL calDF = sqlContext.createDataFrame(rowRDD,架构)
提供了以下错误:
控制台:45:错误:重载方法值createDataFrame与备选方案:
(RDD:org.apache.spark.api.java.JavaRDD [],beanClass:类[的])org.apache.spark.sql.DataFrame
(RDD:org.apache.spark.rdd.RDD [],beanClass:类[的])org.apache.spark.sql.DataFrame
(rowRDD:org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row]模式:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
(rowRDD:org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]模式:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
不能被施加到(org.apache.spark.rdd.RDD [阵列[字符串],结果,
org.apache.spark.sql.types.StructType)
VAL calDF = sqlContext.createDataFrame(rowRDD,架构)
块引用>解决方案只需粘贴到
火花壳
:VAL A =
阵列(
阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),
阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1))VAL RDD = sc.makeRDD(一)案例类X(CALLID:字符串,oCallId:字符串,
callTime:字符串,持续时间:字符串,CALLTYPE:字符串,SWID:字符串)然后
地图()
在RDD创造案件类的实例,然后使用创建数据框toDF()
:斯卡拉> VAL DF = {rdd.map
案阵列(S0,S1,S2,S3,S4,S5)= GT; X(S0,S1,S2,S3,S4,S5)} .toDF()
DF:org.apache.spark.sql.DataFrame =
[CALLID:字符串,oCallId:字符串,callTime:字符串,
持续时间:字符串,CALLTYPE:字符串,SWID:字符串]此推断从案件类的架构。
然后你就可以入手:
斯卡拉> df.printSchema()
根
| - CALLID:字符串(可为空=真)
| - oCallId:字符串(可为空=真)
| - callTime:字符串(可为空=真)
| - 持续时间:字符串(可为空=真)
| - CALLTYPE:字符串(可为空=真)
| - SWID:字符串(可为空=真)斯卡拉> df.show()
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +
| CALLID | oCallId | callTime |持续时间| CALLTYPE | SWID |
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +
| 4580056797 | 0 | 2015年7月29日10:38:42 | 0 | 1 | 1 |
| 4580056797 | 0 | 2015年7月29日10:38:42 | 0 | 1 | 1 |
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +如果你想使用
toDF()
在一个正常的程序(而不是在火花壳
),请确保(从<带引号的href=\"https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache-spark/td-p/29878\"相对=nofollow>这里):
- 要
进口sqlContext.implicits ._
创建之后的SQLContext
- 定义使用方法之外的情况下,类
toDF()
The RDD has been created in the format
Array[Array[String]]
and has the following values:Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1))
I want to create a dataFrame with the schema :
val schemaString = "callId oCallId callTime duration calltype swId"
Next steps:
scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim)) rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39 scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)
Gives the following error:
console:45: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.rdd.RDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame(rowRDD, schema)
解决方案Just paste into a
spark-shell
:val a = Array( Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) val rdd = sc.makeRDD(a) case class X(callId: String, oCallId: String, callTime: String, duration: String, calltype: String, swId: String)
Then
map()
over the RDD to create instances of the case class, and then create the DataFrame usingtoDF()
:scala> val df = rdd.map { case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() df: org.apache.spark.sql.DataFrame = [callId: string, oCallId: string, callTime: string, duration: string, calltype: string, swId: string]
This infers the schema from the case class.
Then you can proceed with:
scala> df.printSchema() root |-- callId: string (nullable = true) |-- oCallId: string (nullable = true) |-- callTime: string (nullable = true) |-- duration: string (nullable = true) |-- calltype: string (nullable = true) |-- swId: string (nullable = true) scala> df.show() +----------+-------+-------------------+--------+--------+----+ | callId|oCallId| callTime|duration|calltype|swId| +----------+-------+-------------------+--------+--------+----+ |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| +----------+-------+-------------------+--------+--------+----+
If you want to use
toDF()
in a normal program (not in thespark-shell
), make sure (quoted from here):
- To
import sqlContext.implicits._
right after creating theSQLContext
- Define the case class outside of the method using
toDF()
这篇关于在星火转换为RDD数据框/斯卡拉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!