如何将当前行的值除以下面的值? [英] How to divide the value of current row with the following one?

查看:20
本文介绍了如何将当前行的值除以下面的值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Spark-Sql 1.6 版本中,使用 DataFrames,有没有办法计算特定列的每行除当前行和下一行的分数?

例如,如果我有一个只有一列的表格,就像这样

年龄10050204

我想要以下输出

法语22.55

最后一行被删除,因为它没有要添加的下一行".

现在我通过对表格进行排名并将其与自身连接来实现,其中 rank 等于 rank+1.

有没有更好的方法来做到这一点?这可以用 Window 函数完成吗?

解决方案

Window 函数应该只做部分技巧.其他部分技巧可以通过定义一个 udf 函数

来完成

def div = udf((age: Double, lag: Double) => lag/age)

首先我们需要使用Window函数找到lag,然后将lagage传入udf 函数查找div导入 sqlContext.implicits._导入 org.apache.spark.sql.functions._

val 数据帧 = Seq(("A",100),("A",50),("A",20),("A",4)).toDF("人", "年龄")val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))

最后调用udf函数

newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age",滞后").显示

最终输出是

+------+---+|人|div|+------+---+|A|2.0||A|2.5||A|5.0|+------+---+

已编辑正如@Jacek 提出了一个更好的解决方案,使用 .na.drop 而不是 .filter(newDF("lag").isNotNull) 并使用 / 运算符,所以我们甚至不需要调用 udf 函数

newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show

In Spark-Sql version 1.6, using DataFrames, is there a way to calculate, for a specific column, the fraction of dividing current row and the next one, for every row?

For example, if I have a table with one column, like so

Age
100
50
20
4

I'd like the following output

Franction
2
2.5
5

The last row is dropped because it has no "next row" to be added to.

Right now I am doing it by ranking the table and joining it with itself, where the rank is equals to rank+1.

Is there a better way to do this? Can this be done with a Window function?

解决方案

Window function should do only partial tricks. Other partial trick can be done by defining a udf function

def div = udf((age: Double, lag: Double) => lag/age)

First we need to find the lag using Window function and then pass that lag and age in udf function to find the div import sqlContext.implicits._ import org.apache.spark.sql.functions._

val dataframe = Seq(
  ("A",100),
  ("A",50),
  ("A",20),
  ("A",4)
).toDF("person", "Age")

val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))

And finally cal the udf function

newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show

Final output would be

+------+---+
|person|div|
+------+---+
|     A|2.0|
|     A|2.5|
|     A|5.0|
+------+---+

Edited As @Jacek has suggested a better solution to use .na.drop instead of .filter(newDF("lag").isNotNull) and use / operator , so we don't even need to call the udf function

newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show

这篇关于如何将当前行的值除以下面的值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆