在 spark scala 中查找平均值给出空白结果 [英] Finding average value in spark scala gives blank result

查看:29
本文介绍了在 spark scala 中查找平均值给出空白结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 input.txt 文件.数据如下.

I have a input.txt file. Data looks as below.

1   1383260400000   0   0.08136262351125882             
1   1383260400000   39  0.14186425470242922 0.1567870050390246  0.16093793691701822 0.052274848528573205    11.028366381681026
1   1383261000000   0   0.13658782275823106         0.02730046487718618 
1   1383261000000   33                  0.026137424264286602
2241    1383324600000   0   0.16869936142032646             
2241    1383324600000   39  0.820500491400199   0.6518011299798726  1.658248219576473   3.4506242774863045  36.71096470849049
2241    1383324600000   49  0.16295028249496815

假设第一列是 id,其他列分别是 col1、col2、col3、col4、col5、col6 和 col7.我想为每个 id 找到 col7 的平均值.基本上我想要我的结果,id,col7 格式的平均值.

Assume the first column is id and other columns are col1,col2,col3,col4,col5,col6 and col7 respectively.I want to find average for the col7 for each id. Basically I want my results in, id, avg of col7 format.

这是我迄今为止尝试过的代码.我在txt文件中读取了我的数据.然后我创建了一个架构.

This is the code I have tried so far. I read my data in txt file. Then I created a schema.

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

然后我创建了一个数据框.

Then I created a data frame.

val data = text.map(line => line.split("\\t")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0)))) 

最后保存在一个txt文件中.

Finally save in a txt file.

val res1 = df.groupBy("ID").agg(avg("col7"))

res1.rdd.saveAsTextFile("/stuaverage/spoutput12")

当我运行它时,我得到了几个空白结果的文件.例如

When I run this I get several file with blank results. e.g.

[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]

第一列是正确的.但是对于第二列,我应该得到一个值.(尽管某些行缺少值)

First column is correct. But for the second column, I should get a value. (although values are missing for some rows)

请帮忙.

推荐答案

我建议您使用 sqlContext api 并使用您定义的架构

I would suggest you to use sqlContext api and use the schema you have defined

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", "\\t")
  .schema(schema)
  .load("path to your text file") 

架构是

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

之后,您只需要在分组的 dataframe 上将 avg 函数应用为

After that all you need is to apply avg function on the grouped dataframe as

import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))

最后,您可以从 dataframe 直接保存到 csv.您不需要转换为 rdd

finally you can save directly to csv from dataframe. You don't need to convert to rdd

  res1.coalesce(1).write.csv("/stuaverage/spoutput12")

这篇关于在 spark scala 中查找平均值给出空白结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆