Spark 和 SparkSQL:如何模仿窗口函数? [英] Spark and SparkSQL: How to imitate window function?

查看:23
本文介绍了Spark 和 SparkSQL:如何模仿窗口函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一个数据帧 df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

我想创建一个运行计数器或索引,

I want to create a running counter or index,

  • 按相同的 id 分组和
  • 按该组中的日期排序,

因此

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

这是我可以通过窗口函数实现的,例如

This is something I can achieve with window function, e.g.

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

不幸的是,Spark 1.4.1 不支持常规数据帧的窗口函数:

Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

问题

  • 如何在不使用窗口函数的情况下在当前 Spark 1.4.1 上实现上述计算?
  • Spark 何时支持常规数据帧的窗口函数?
  • 谢谢!

    推荐答案

    您可以使用 RDD 来做到这一点.就个人而言,我发现 RDD 的 API 更有意义 - 我并不总是希望我的数据像数据框一样扁平".

    You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.

    val df = sqlContext.sql("select 1, '2015-09-01'"
        ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
        ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
        ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
        ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
    
    // dataframe as an RDD (of Row objects)
    df.rdd 
      // grouping by the first column of the row
      .groupBy(r => r(0)) 
      // map each group - an Iterable[Row] - to a list and sort by the second column
      .map(g => g._2.toList.sortBy(row => row(1).toString))     
      .collect()
    

    上面给出的结果如下:

    Array[List[org.apache.spark.sql.Row]] = 
    Array(
      List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
      List([2,2015-09-01], [2,2015-09-04]))
    

    如果您还想要在组"中的位置,您可以使用 zipWithIndex.

    If you want the position within the 'group' as well, you can use zipWithIndex.

    df.rdd.groupBy(r => r(0)).map(g => 
        g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
    
    Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
      List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
      List(([2,2015-09-01],0), ([2,2015-09-04],1)))
    

    可以使用 FlatMap 将其展平为一个简单的 Row 对象列表/数组,但是如果您需要在组"上执行任何不会是个好主意.

    You could flatten this back to a simple List/Array of Row objects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.

    像这样使用 RDD 的缺点是从 DataFrame 转换到 RDD 再转换回来很乏味.

    The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.

    这篇关于Spark 和 SparkSQL:如何模仿窗口函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆