SparkSQL - 滞后功能? [英] SparkSQL - Lag function?

查看:921
本文介绍了SparkSQL - 滞后功能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看到在这个 DataBricks后,对于在SparkSql窗口功能的支持,尤其是我试图使用滞后()窗函数。

我的信用卡交易行,我已经整理他们,现在我要遍历行,并为每一行显示交易的金额,以及当前行的量差和preceding行的金额。

继DataBricks后,我想出了这个查询,但它抛出一个异常,我,我不能完全为何已了解..

这是在PySpark .. TX是注册为一个临时表已创建我的数据帧。

 检测= sqlContext.sql(SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt,(滞后(tx.amt)OVER(PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time列之间从TX prev_amtpreCEDING和CURRENT ROW)))

和异常(截断)。

  py4j.protocol.Py4JJavaError:同时呼吁o76.sql发生错误。
:了java.lang.RuntimeException:[1.67]失败:``)'',但却标识符OVER发现

我真的AP precaite任何见解,这个功能是相当新的,有没有很多,据现有的例子或其他相关的帖子去。

修改

我也尝试做没有SQL语句如下,而是继续得到一个错误。我曾与蜂巢和SQLContext用这个,并获得同样的错误。

  windowSpec = \\
窗口\\
    .partitionBy(h_tx_df_ordered ['cc_num'])\\
    .orderBy(h_tx_df_ordered ['cc_num'],h_tx_df_ordered ['TRANS_DATE'],h_tx_df_ordered ['trans_time'])windowSpec.rowsBetween(-1,0)lag_amt = \\
   (滞后(h_tx_df_ordered ['AMT'])以上(windowSpec) - h_tx_df_ordered ['AMT'])
    tx_df_ordered.select(
    h_tx_df_ordered ['cc_num'],
    h_tx_df_ordered ['TRANS_DATE'],
    h_tx_df_ordered ['trans_time'],
    h_tx_df_ordered ['AMT'],
    lag_amt.alias(prev_amt))。展()


 回溯(最后最近一次调用):
  文件rdd_raw_data.py,116线,上述<&模块GT;
    lag_amt.alias(prev_amt))。展()
  文件/opt/spark/python/pyspark/sql/dataframe.py,线路721,在选择
    JDF = self._jdf.select(self._jcols(* COLS))
  文件/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py,线路813,在__call__
    答案,self.gateway_client,self.target_id,self.name)
  文件/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py,线路308,在get_return_value
    格式(target_id,。,名称),值)
py4j.protocol.Py4JJavaError:同时呼吁o152.select发生错误。
:org.apache.spark.sql.AnalysisException:无法解析窗函数'滞后'。需要注意的是,使用窗函数目前需要HiveContext;
    在org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.failAnalysis(CheckAnalysis.scala:38)


解决方案

  1. 框架规范应该以关键字开始 ROWS 不是

  2. 框规范要求或者下限值

     介于1 preCEDING和CURRENT ROW ROWS

    无界关键字

     无界之间preCEDING和CURRENT ROW ROWS


  3. LAG 函数不接受的帧都因此与滞后正确的SQL查询可以像这样

      SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt,LAG(tx.amt)OVER(
         PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
    )从TX prev_amt


修改

对于SQL DSL用法:


  1. 正如你可以在错误信息读


      

    需要注意的是,使用窗口函数的当前需要一个HiveContex


    请务必进行初始化 sqlContext 使用 HiveContext 不是 SQLContext


  2. windowSpec.rowsBetween(-1,0)什么都不做,但再次框确定不是由滞后功能。


I see in this DataBricks post, there is support for window functions in SparkSql, in particular I'm trying to use the lag() window function.

I have rows of credit card transactions, and I've sorted them, now I want to iterate over the rows, and for each row display the amount of the transaction, and the difference of the current row's amount and the preceding row's amount.

Following the DataBricks post, I've come up with this query, but it's throwing an exception at me and I can't quite undestand why..

This is in PySpark.. tx is my dataframe already created at registered as a temp table.

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

and the exception (truncated)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

I'd really apprecaite any insight, this functionality is rather new and there's not a lot to go on as far as existing examples or other related posts.

Edit

I've also attempted to do this without SQL statement as below, but continue to get an error. I've used this with Hive and SQLContext, and receive the same error.

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()


Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)

解决方案

  1. Frame specification should start with a keyword ROWS not ROW
  2. Frame specification requires either lower bound value

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    

    or UNBOUNDED keyword

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    

  3. LAG function doesn't accept frame at all so a correct SQL query with lag can look like this

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    

Edit:

Regarding SQL DSL usage:

  1. As you can read in an error message

    Note that, using window functions currently requires a HiveContex

    Be sure to initialize sqlContext using HiveContext not SQLContext

  2. windowSpec.rowsBetween(-1, 0) does nothing, but once again frame specification is not supported by the lag function.

这篇关于SparkSQL - 滞后功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆