pyspark-分组和计算数据 [英] pyspark - Grouping and calculating data

查看:443
本文介绍了pyspark-分组和计算数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下csv文件.

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

我必须创建一个RDD,其中USER MODEL和GT是主键,我不知道是否必须将它们用作元组.

I have to create a RDD where USER MODEL AND GT are PRIMARY KEY, I don't know if I have to do it using them as a tuple.

然后,当我拥有主键字段时,我必须根据'x','y'和'z'来计算AVG,MAX和MIN.

Then when I have the primary key field I have to calculate AVG, MAX and MIN from 'x','y' and 'z'.

以下是输出:

User,Model,gt,media(x,y,z),desviacion(x,y,z),max(x,y,z),min(x,y,z)
a, nexus4,stand,-3.0,0.7,8.2,2.8,0.14,0.0,-1.0,0.8,8.2,-5.0,0.6,8.2

关于如何对其进行分组并例如从"x"获取媒体值的任何想法

Any idea about how to group them and for example get the media values from "x"

使用我当前的代码,我得到以下信息.

With my current code I get the following.

# Data loading

    lectura = sc.textFile("Phones_accelerometer.csv")

    datos = lectura.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(x.split(",")[3], x.split(",")[4], x.split(",")[5])))

    sumCount = datos.combineByKey(lambda value: (value, 1), lambda x, value: (x[0] + value, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))

我的元组的示例:

   [(('a', 'nexus4', 'stand'), ('-5.958191', '0.6880646', '8.135345'))]

推荐答案

如果问题中给出的文件中包含 csv数据,则可以使用sqlContext将其读取为dataframe并将适当的类型强制转换为

If you have a csv data in a file as given in the question then you can use sqlContext to read it as a dataframe and cast the appropriate types as

df = sqlContext.read.format("com.databricks.spark.csv").option("header", True).load("path to csv file")
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x').cast('float'), F.col('y').cast('float'), F.col('z').cast('float'))

我只选择了主键和必要的列,应该为您提供

I have selected primary keys and necessary columns only which should give you

+----+------+-----+----------+---------+--------+
|User|Model |gt   |x         |y        |z       |
+----+------+-----+----------+---------+--------+
|a   |nexus4|stand|-5.958191 |0.6880646|8.135345|
|a   |nexus4|stand|-5.95224  |0.6702118|8.136536|
|a   |nexus4|stand|-5.9950867|0.6535492|8.204376|
|a   |nexus4|stand|-5.9427185|0.6761627|8.128204|
+----+------+-----+----------+---------+--------+

您的所有要求:中位数,偏差,最大值和最小值在按主键分组时取决于xyz的列表:User, Model and gt

All of your requirements: median, deviation, max and min depend on the list of x, y and z when grouped by primary keys: User, Model and gt.

因此,您将需要groupBycollect_list 内置函数和一个udf函数来计算所有需求.最后一步是将它们分为以下不同的列

So you would need groupBy and collect_list inbuilt function and a udf function to calculate all of your requiremnts. Final step is to separate them in different columns which are given below

from math import sqrt
def calculation(array):
    num_items = len(array)
    print num_items, sum(array)
    mean = sum(array) / num_items
    differences = [x - mean for x in array]
    sq_differences = [d ** 2 for d in differences]
    ssd = sum(sq_differences)
    variance = ssd / (num_items - 1)
    sd = sqrt(variance)
    return [mean, sd, max(array), min(array)]

calcUdf = F.udf(calculation, T.ArrayType(T.FloatType()))

df.groupBy('User', 'Model', 'gt')\
    .agg(calcUdf(F.collect_list(F.col('x'))).alias('x'), calcUdf(F.collect_list(F.col('y'))).alias('y'), calcUdf(F.collect_list(F.col('z'))).alias('z'))\
    .select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x')[0].alias('median_x'), F.col('y')[0].alias('median_y'), F.col('z')[0].alias('median_z'), F.col('x')[1].alias('deviation_x'), F.col('y')[1].alias('deviation_y'), F.col('z')[1].alias('deviation_z'), F.col('x')[2].alias('max_x'), F.col('y')[2].alias('max_y'), F.col('z')[2].alias('max_z'), F.col('x')[3].alias('min_x'), F.col('y')[3].alias('min_y'), F.col('z')[3].alias('min_z'))\
    .show(truncate=False)

所以最后您应该拥有

+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|User|Model |gt   |median_x |median_y |median_z|deviation_x|deviation_y|deviation_z|max_x     |max_y    |max_z   |min_x     |min_y    |min_z   |
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|a   |nexus4|stand|-5.962059|0.6719971|8.151115|0.022922019|0.01436464 |0.0356973  |-5.9427185|0.6880646|8.204376|-5.9950867|0.6535492|8.128204|
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+

我希望答案会有所帮助.

I hope the answer is helpful.

这篇关于pyspark-分组和计算数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆