以正确的格式获取 apache spark 数据帧 [英] Getting an apache spark dataframe in the right format

查看:22
本文介绍了以正确的格式获取 apache spark 数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将一些输入转换为我想要的 spark 数据帧格式.我的输入是这个 case 类的 Sequence,最多有 10,000,000 个类(或者也可能是我将其转换为 case 类之前的 Json 字符串..):

I am trying to convert some input to the format I want in an spark dataframe. The input I have is a Sequence of this case class with up to 10,000,000 classes (or possibly also the Json string before I convert it to the case class..):

case class Element(paramName: String, value: Int, time: Int)

因此我想要一个这样的数据框:

As a result I want a dataframe like this:

|Time | ParamA | ParamB | ParamC | Param 10,000 |  
|1000 | 432432 | 8768768 | Null....... | 75675678622 |  
|2000 | Null.......| Null.........| 734543 | Null................. |  

....
因此,并非必须为所有时隙定义每个参数.缺失值应填充为 Null.并且可能会有 10,000 个参数和大约 1000 个时隙.

....
So not every parameter has to have to be defined for all time slots. Missing values should be filled with Null. And there are probably going to be 10,000 parameter and around 1000 time slots.

从效率上看,我现在的做法似乎很糟糕:

The way I do it right now seems to be very bad from the efficiency:

case class Elements(name: String, value: Int, time: Int)

case class GroupedObjects(time: Int, params: (String, Int)*)

 //elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
  .groupBy(element => element.time)
  .map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
    (element.name, element.value)).toSeq: _*))

//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
  "{\"time\":" + obj.time + obj.params.map(tuple => 
     ",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)

我在这里看到的问题肯定是改回字符串,只是以正确的格式再次读取它.我真的很高兴能帮助我展示如何以所需的数据帧格式获取输入案例类.
按照我现在的方式,它真的很慢,而且我收到了 10,000,000 条输入行的堆大小异常.

The problem I see here is definitely the change back to a String, only to read it in again in the right format. I would be really glad for any help showing me how to get the input case class in the wanted dataframe format.
With the way I am doing it right now it is really slow and I get a heap size exception for 10,000,000 input lines.

推荐答案

您可以尝试手动构建 Row 对象并定义 RDD 模式,类似于以下示例:

You might try to build Row objects and define the RDD schema manually, something like the following example:

// These extra imports will be required if you don't have them already
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

//elements contains the seq of Element
val elementsRdd = sc.parallelize(elements)

val columnNames = elementsRdd.map(_.name).distinct().collect().sorted

val pivoted = elementsRdd.groupBy(_.time).map {
  case (time, elemsByTime) =>
    val valuesByColumnName = elemsByTime.groupBy(_.name).map {
      case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum)
    }
    val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null))
    (time, allValuesForRow)
}

val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList)
val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show(10)

我在本地尝试了 10,000,000 个这样的元素:

I tried this locally with 10,000,000 elements like this:

val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000))

并且在合理的时间内成功完成.

And it completes successfully in a reasonable time.

这篇关于以正确的格式获取 apache spark 数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆