在星火转换为RDD数据框/斯卡拉 [英] Convert RDD to Dataframe in Spark/Scala

查看:493
本文介绍了在星火转换为RDD数据框/斯卡拉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

该RDD以格式创建数组[数组[字符串]] 并具有以下值:

 数组[阵列[字符串] =阵列(阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),阵列(4580056797, 0,2015年7月29日10时38分42秒,0,1,1),阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),阵列(4580057445,0, 2015年7月29日十点40分37秒,0,1,1),阵列(4580057445,0,2015年7月29日十点40分37秒,0,1,1))

我要创建一个模式的数据框:

  VAL schemaString =CALLID oCallId callTime时间CALLTYPE SWID

接下来的步骤:

 斯卡拉> VAL rowRDD = rdd.map(p值=>阵列(P(0),P(1)中,p(2),第(3)中,p(4),第(5).trim))
rowRDD:org.apache.spark.rdd.RDD [阵列[字符串] = MapPartitionsRDD [14]在地图AT<&控制台GT;:39
斯卡拉> VAL calDF = sqlContext.createDataFrame(rowRDD,架构)

提供了以下错误:


  

控制台:45:错误:重载方法值createDataFrame与备选方案:
       (RDD:org.apache.spark.api.java.JavaRDD [],beanClass:类[的])org.apache.spark.sql.DataFrame
      (RDD:org.apache.spark.rdd.RDD [],beanClass:类[的])org.apache.spark.sql.DataFrame
      (rowRDD:org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row]模式:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
      (rowRDD:org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]模式:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
      不能被施加到(org.apache.spark.rdd.RDD [阵列[字符串],结果,
      org.apache.spark.sql.types.StructType)
         VAL calDF = sqlContext.createDataFrame(rowRDD,架构)



解决方案

只需粘贴到火花壳

  VAL A =
  阵列(
    阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1),
    阵列(4580056797,0,2015年7月29日10时38分42秒,0,1,1))VAL RDD = sc.makeRDD(一)案例类X(CALLID:字符串,oCallId:字符串,
  callTime:字符串,持续时间:字符串,CALLTYPE:字符串,SWID:字符串)

然后地图()在RDD创造案件类的实例,然后使用创建数据框 toDF()

 斯卡拉> VAL DF = {rdd.map
  案阵列(S0,S1,S2,S3,S4,S5)= GT; X(S0,S1,S2,S3,S4,S5)} .toDF()
DF:org.apache.spark.sql.DataFrame =
  [CALLID:字符串,oCallId:字符串,callTime:字符串,
    持续时间:字符串,CALLTYPE:字符串,SWID:字符串]

此推断从案件类的架构。

然后你就可以入手:

 斯卡拉> df.printSchema()

 | - CALLID:字符串(可为空=真)
 | - oCallId:字符串(可为空=真)
 | - callTime:字符串(可为空=真)
 | - 持续时间:字符串(可为空=真)
 | - CALLTYPE:字符串(可为空=真)
 | - SWID:字符串(可为空=真)斯卡拉> df.show()
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +
| CALLID | oCallId | callTime |持续时间| CALLTYPE | SWID |
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +
| 4580056797 | 0 | 2015年7月29日10:38:42 | 0 | 1 | 1 |
| 4580056797 | 0 | 2015年7月29日10:38:42 | 0 | 1 | 1 |
+ ---------- + ------- + ------------------- + -------- + - ------- + ---- +

如果你想使用 toDF()在一个正常的程序(而不是在火花壳),请确保(从<带引号的href=\"https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache-spark/td-p/29878\"相对=nofollow>这里):


  • 进口sqlContext.implicits ._ 创建之后的 SQLContext

  • 定义使用方法之外的情况下,类 toDF()

The RDD has been created in the format Array[Array[String]] and has the following values:

 Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1))

I want to create a dataFrame with the schema :

val schemaString = "callId oCallId callTime duration calltype swId"

Next steps:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

Gives the following error:

console:45: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.rdd.RDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame(rowRDD, schema)

解决方案

Just paste into a spark-shell:

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

Then map() over the RDD to create instances of the case class, and then create the DataFrame using toDF():

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

This infers the schema from the case class.

Then you can proceed with:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

If you want to use toDF() in a normal program (not in the spark-shell), make sure (quoted from here):

  • To import sqlContext.implicits._ right after creating the SQLContext
  • Define the case class outside of the method using toDF()

这篇关于在星火转换为RDD数据框/斯卡拉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆