在Spark Scala中找到平均值可得出空白结果 [英] Finding average value in spark scala gives blank result

查看:81
本文介绍了在Spark Scala中找到平均值可得出空白结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个input.txt文件.数据如下.

I have a input.txt file. Data looks as below.

1   1383260400000   0   0.08136262351125882             
1   1383260400000   39  0.14186425470242922 0.1567870050390246  0.16093793691701822 0.052274848528573205    11.028366381681026
1   1383261000000   0   0.13658782275823106         0.02730046487718618 
1   1383261000000   33                  0.026137424264286602
2241    1383324600000   0   0.16869936142032646             
2241    1383324600000   39  0.820500491400199   0.6518011299798726  1.658248219576473   3.4506242774863045  36.71096470849049
2241    1383324600000   49  0.16295028249496815

假设第一列是id,其他列分别是col1,col2,col3,col4,col5,col6和col7.我想为每个id查找col7的平均值.基本上我想要我的结果, id,col7格式的平均值.

Assume the first column is id and other columns are col1,col2,col3,col4,col5,col6 and col7 respectively.I want to find average for the col7 for each id. Basically I want my results in, id, avg of col7 format.

这是我到目前为止尝试过的代码. 我在txt文件中读取了数据. 然后,我创建了一个架构.

This is the code I have tried so far. I read my data in txt file. Then I created a schema.

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

然后我创建了一个数据框.

Then I created a data frame.

val data = text.map(line => line.split("\\t")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0)))) 

最后保存为txt文件.

Finally save in a txt file.

val res1 = df.groupBy("ID").agg(avg("col7"))

res1.rdd.saveAsTextFile("/stuaverage/spoutput12")

运行此命令时,我得到几个文件,结果为空. 例如

When I run this I get several file with blank results. e.g.

[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]

第一列是正确的.但是对于第二列,我应该得到一个值. (尽管某些行缺少值)

First column is correct. But for the second column, I should get a value. (although values are missing for some rows)

请帮助.

推荐答案

我建议您使用sqlContext api并使用已定义的架构

I would suggest you to use sqlContext api and use the schema you have defined

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", "\\t")
  .schema(schema)
  .load("path to your text file") 

架构为

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

在那之后,您需要将avg函数应用到

After that all you need is to apply avg function on the grouped dataframe as

import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))

最后,您可以直接从dataframe保存到csv.您无需转换为rdd

finally you can save directly to csv from dataframe. You don't need to convert to rdd

  res1.coalesce(1).write.csv("/stuaverage/spoutput12")

这篇关于在Spark Scala中找到平均值可得出空白结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆