Spark数据帧-按键减少 [英] Spark Dataframes- Reducing By Key

查看:98
本文介绍了Spark数据帧-按键减少的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个这样的数据结构,其中ts是一些时间戳记

Let's say I have a data structure like this where ts is some timestamp

case class Record(ts: Long, id: Int, value: Int)

给出大量的这些记录,我想以每个ID的时间戳都最高的记录结束.我认为使用RDD API可以完成以下代码:

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}

这也是我对数据集的尝试:

Likewise this is my attempt with datasets:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

我正在尝试找出如何与数据框实现相似的功能,但无济于事-我意识到我可以使用以下方式进行分组:

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

records.groupBy($"id")

但是,这给了我一个RelationGroupedDataSet,而且我不清楚我需要编写什么聚合函数来实现我想要的功能-我看到的所有示例聚合似乎都集中在返回仅一个聚合的列上,而不是整个列上行.

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

是否可以使用数据框来实现?

Is it possible to achieve this using dataframes?

推荐答案

您可以使用argmax逻辑(请参见

You can use the argmax logic (see databricks example)

例如,假设您的数据框称为df,并且具有id,val,ts列,您可以执行以下操作:

For example, lets say your dataframe is called df and it has the columns id, val, ts you would do something like this:

import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")

这篇关于Spark数据帧-按键减少的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆