获得Apache的火花数据框以正确的格式 [英] Getting an apache spark dataframe in the right format

查看:182
本文介绍了获得Apache的火花数据框以正确的格式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想一些输入转换成我想要在一个火花数据帧的格式。
输入我已经是这种情况下类具有多达10,000,000类(或者也可能是JSON字符串之前,我把它转换成案例类..)序列:

I am trying to convert some input to the format I want in an spark dataframe. The input I have is a Sequence of this case class with up to 10,000,000 classes (or possibly also the Json string before I convert it to the case class..):

case class Element(paramName: String, value: Int, time: Int)

因此​​,我想这样的数据框:

As a result I want a dataframe like this:

|Time | ParamA | ParamB | ParamC | Param 10,000 |  
|1000 | 432432 | 8768768 | Null....... | 75675678622 |  
|2000 | Null.......| Null.........| 734543 | Null................. |  

....结果
因此不是每个参数都必须有对所有时隙被定义。缺失值应该充满空。并有可能将10000参数,约1000个时隙。

....
So not every parameter has to have to be defined for all time slots. Missing values should be filled with Null. And there are probably going to be 10,000 parameter and around 1000 time slots.

我现在就做的方式似乎是很糟糕的,从效率:

The way I do it right now seems to be very bad from the efficiency:

case class Elements(name: String, value: Int, time: Int)

case class GroupedObjects(time: Int, params: (String, Int)*)

 //elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
  .groupBy(element => element.time)
  .map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
    (element.name, element.value)).toSeq: _*))

//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
  "{\"time\":" + obj.time + obj.params.map(tuple => 
     ",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)

我看到这里的问题肯定是变回一个字符串,只有以正确的格式读取它一次。我将是展示我如何得到想要的数据帧格式输入案例类的任何帮助真的很高兴。结果
随着我这样做是正确的方式现在它实在是太慢了,我得到了10,000,000输入线堆大小例外。

The problem I see here is definitely the change back to a String, only to read it in again in the right format. I would be really glad for any help showing me how to get the input case class in the wanted dataframe format.
With the way I am doing it right now it is really slow and I get a heap size exception for 10,000,000 input lines.

推荐答案

您可以尝试手动构建行对象并定义RDD模式,像下面的例子:

You might try to build Row objects and define the RDD schema manually, something like the following example:

// These extra imports will be required if you don't have them already
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

//elements contains the seq of Element
val elementsRdd = sc.parallelize(elements)

val columnNames = elementsRdd.map(_.name).distinct().collect().sorted

val pivoted = elementsRdd.groupBy(_.time).map {
  case (time, elemsByTime) =>
    val valuesByColumnName = elemsByTime.groupBy(_.name).map {
      case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum)
    }
    val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null))
    (time, allValuesForRow)
}

val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList)
val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show(10)

我10,000,000元这样的尝试在本地这样的:

I tried this locally with 10,000,000 elements like this:

val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000))

和它在一个合理的时间内成功完成。

And it completes successfully in a reasonable time.

这篇关于获得Apache的火花数据框以正确的格式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆