SparkSQL - 滞后函数? [英] SparkSQL - Lag function?

查看:30
本文介绍了SparkSQL - 滞后函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在这个 DataBricks 帖子中看到,SparkSql 中支持窗口函数,特别是我正在尝试使用 lag() 窗口函数.

I see in this DataBricks post, there is support for window functions in SparkSql, in particular I'm trying to use the lag() window function.

我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行金额的差异量.

I have rows of credit card transactions, and I've sorted them, now I want to iterate over the rows, and for each row display the amount of the transaction, and the difference of the current row's amount and the preceding row's amount.

在 DataBricks 帖子之后,我提出了这个查询,但它向我抛出了一个异常,我不太明白为什么..

Following the DataBricks post, I've come up with this query, but it's throwing an exception at me and I can't quite undestand why..

这是在 PySpark 中.. tx 是我的数据帧已经在注册为临时表时创建的.

This is in PySpark.. tx is my dataframe already created at registered as a temp table.

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(被截断)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很想了解任何见解,此功能相当新,就现有示例或其他相关帖子而言,没有太多可做的.

I'd really apprecaite any insight, this functionality is rather new and there's not a lot to go on as far as existing examples or other related posts.

编辑

我也尝试在没有 SQL 语句的情况下执行此操作,如下所示,但继续出现错误.我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误.

I've also attempted to do this without SQL statement as below, but continue to get an error. I've used this with Hive and SQLContext, and receive the same error.

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

<小时>

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)

推荐答案

  1. 帧规范应该以关键字ROWS 开头,而不是ROW
  2. 帧规范需要下限值

  1. Frame specification should start with a keyword ROWS not ROW
  2. Frame specification requires either lower bound value

ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

UNBOUNDED关键字

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

  • LAG 函数根本不接受帧,因此带有延迟的正确 SQL 查询可能如下所示

  • LAG function doesn't accept frame at all so a correct SQL query with lag can look like this

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    

  • 编辑:

    关于 SQL DSL 的使用:

    Regarding SQL DSL usage:

    1. 正如您在错误消息中所读到的

    1. As you can read in an error message

    注意,使用窗口函数目前需要一个 HiveContex

    Note that, using window functions currently requires a HiveContex

    务必使用 HiveContext 而不是 SQLContext

    windowSpec.rowsBetween(-1, 0) 什么都不做,但是 lag 函数再次不支持帧规范.

    windowSpec.rowsBetween(-1, 0) does nothing, but once again frame specification is not supported by the lag function.

    这篇关于SparkSQL - 滞后函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆