聚合函数在 Spark 中使用 groupBy 计算使用情况 [英] aggregate function Count usage with groupBy in Spark

查看:30
本文介绍了聚合函数在 Spark 中使用 groupBy 计算使用情况的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在 pySpark 的一行代码中进行多项操作,并且不确定我的情况是否可行.

I'm trying to make multiple operations in one line of code in pySpark, and not sure if that's possible for my case.

我的目的不是将输出保存为新的数据帧.

My intention is not having to save the output as a new dataframe.

我当前的代码相当简单:

My current code is rather simple:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

我的目的是在使用 groupBy 之后添加 count(),以获得与 timePeriod 的每个值匹配的记录数 列,打印\显示为输出.

And my intention is to add count() after using groupBy, to get, well, the count of records matching each value of timePeriod column, printed\shown as output.

当我尝试使用 groupBy(..).count().agg(..) 时出现异常.

When trying to use groupBy(..).count().agg(..) I get exceptions.

有什么办法可以同时实现 count()agg().show() 打印,而无需将代码拆分为两行命令,例如:

Is there any way to achieve both count() and agg().show() prints, without splitting code to two lines of commands, e.g. :

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

或者更好的是,将合并的输出合并到 agg.show() 输出 - 一个额外的列,它说明与行值匹配的记录计数.例如:

Or better yet, for getting a merged output to agg.show() output - An extra column which states the counted number of records matching the row's value. e.g.:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

推荐答案

count() 可以在 agg() 中作为 groupBy 使用表达是一样的.

count() can be used inside agg() as groupBy expression is same.

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL函数文档

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) 将按等于 count(timePeriod")

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

这篇关于聚合函数在 Spark 中使用 groupBy 计算使用情况的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆