参数为动态的火花滞后函数 [英] Spark lag function with parameter as dynamic

查看:21
本文介绍了参数为动态的火花滞后函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在spark中实现滞后功能;我可以像下面那样做(带有来自 hive/temp 火花表的一些数据)

I need to implement the lag function in spark; which I was able to do like below (with some data from hive/temp spark table)

假设 DF 有这些行:

Say the DF has these rows:

lagno:value
0, 100
0, 200
2, null
3, null

其中第一列是您要使用的实际滞后数,第二列是实际值.

where the first column is the actual lag number which you want to use, and the second column is actual value.

当我运行此查询时,它可以工作:

When I run this query it works:

DataFrame df;
DataFrame dfnew=df.select(
            org.apache.spark.sql.functions.lag( df.col("value"), 1 ).over(org.apache.spark.sql.expressions.Window.orderBy(new1.col("value"))));

这意味着如果硬编码滞后值没有,它工作正常.

that means if hard code the value of lag no, it works fine.

但是,如果我将滞后值作为参数传递,则它不起作用:

However, if I pass the lag value as a parameter it's not working:

DataFrame dfnew=df.select(
            org.apache.spark.sql.functions.lag( df.col("value"),df.col("lagno").over(org.apache.spark.sql.expressions.Window.orderBy(new1.col("value"))));

我是否需要将 col 类型的参数类型转换为整数?

Do I need to type cast the parameter of col type to integer?

推荐答案

这是不可能的.窗口函数使用无法动态修改的固定大小的框架.您可以为 1..3 计算 lag,然后选择当前行所需的一个.

It is not possible. Window functions use fixed size frames that cannot be dynamically modified. You can compute lag for 1..3 and then select one required for the current row.

CASE 
  WHEN lagno = 1 THEN LAG(value,  1) OVER w 
  WHEN lagno = 2 THEN LAG(value,  2) OVER w 
  ...
  ELSE value
END

这篇关于参数为动态的火花滞后函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆