在 spark scala 中查找平均值给出空白结果 [英] Finding average value in spark scala gives blank result
问题描述
我有一个 input.txt 文件.数据如下.
I have a input.txt file. Data looks as below.
1 1383260400000 0 0.08136262351125882
1 1383260400000 39 0.14186425470242922 0.1567870050390246 0.16093793691701822 0.052274848528573205 11.028366381681026
1 1383261000000 0 0.13658782275823106 0.02730046487718618
1 1383261000000 33 0.026137424264286602
2241 1383324600000 0 0.16869936142032646
2241 1383324600000 39 0.820500491400199 0.6518011299798726 1.658248219576473 3.4506242774863045 36.71096470849049
2241 1383324600000 49 0.16295028249496815
假设第一列是 id,其他列分别是 col1、col2、col3、col4、col5、col6 和 col7.我想为每个 id 找到 col7 的平均值.基本上我想要我的结果,id,col7 格式的平均值.
Assume the first column is id and other columns are col1,col2,col3,col4,col5,col6 and col7 respectively.I want to find average for the col7 for each id. Basically I want my results in, id, avg of col7 format.
这是我迄今为止尝试过的代码.我在txt文件中读取了我的数据.然后我创建了一个架构.
This is the code I have tried so far. I read my data in txt file. Then I created a schema.
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("col1", DoubleType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true),
StructField("col4", DoubleType, true),
StructField("col5", DoubleType, true),
StructField("col6", DoubleType, true),
StructField("col7", DoubleType, true)
))
然后我创建了一个数据框.
Then I created a data frame.
val data = text.map(line => line.split("\\t")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0))))
最后保存在一个txt文件中.
Finally save in a txt file.
val res1 = df.groupBy("ID").agg(avg("col7"))
res1.rdd.saveAsTextFile("/stuaverage/spoutput12")
当我运行它时,我得到了几个空白结果的文件.例如
When I run this I get several file with blank results. e.g.
[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]
第一列是正确的.但是对于第二列,我应该得到一个值.(尽管某些行缺少值)
First column is correct. But for the second column, I should get a value. (although values are missing for some rows)
请帮忙.
推荐答案
我建议您使用 sqlContext api 并使用您定义的架构
I would suggest you to use sqlContext api and use the schema you have defined
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter", "\\t")
.schema(schema)
.load("path to your text file")
架构是
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("col1", DoubleType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true),
StructField("col4", DoubleType, true),
StructField("col5", DoubleType, true),
StructField("col6", DoubleType, true),
StructField("col7", DoubleType, true)
))
之后,您只需要在分组的 dataframe
上将 avg
函数应用为
After that all you need is to apply avg
function on the grouped dataframe
as
import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))
最后,您可以从 dataframe
直接保存到 csv
.您不需要转换为 rdd
finally you can save directly to csv
from dataframe
. You don't need to convert to rdd
res1.coalesce(1).write.csv("/stuaverage/spoutput12")
这篇关于在 spark scala 中查找平均值给出空白结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!