Spark中的累积产品? [英] Cumulative product in Spark?
问题描述
我尝试在Spark Scala中实现累积产品,但我真的不知道如何做到。我有以下数据框:
输入数据:
+-+-+ ------ -+ ---- +
| A | B |日期| val |
+-+-+ -------- + ---- +
| rr | gg | 20171103 | 2 |
| hh | jj | 20171103 | 3 |
| rr | gg | 20171104 | 4 |
| hh | jj | 20171104 | 5 |
| rr | gg | 20171105 | 6 |
| hh | jj | 20171105 | 7 |
+ ------- + ------ + ---- +
我想得到以下输出
输出数据:
+-+- + -------- + ----- +
| A | B |日期| val |
+-+-+ -------- + ----- +
| rr | gg | 20171105 | 48 | // 2 * 4 * 6
| hh | jj | 20171105 | 105 | // 3 * 5 * 7
+ ------- + ------ + ----- +
如果您对此有任何想法,那将非常有帮助:)
非常感谢> p>
只要数字严格为正(也可以使用 coalesce处理0(如果存在)
),如您的示例所示,最简单的解决方案是计算对数和并取指数:
import org.apache.spark.sql.functions。{exp,log,max,sum}
val df = Seq(
( rr, gg, 20171103 ,2),( hh, jj, 20171103,3),
( rr, gg, 20171104,4),( hh, jj, 20171104,5),
( rr, gg, 20171105,6),( hh, jj, 20171105,7)
).toDF( A, B, date, val)
val结果= df
.groupBy( A, B)
.agg(
max($ date)。as( date),
exp(sum(log($(val)))。as( val))
由于这使用FP算术,结果w
result.show
+ --- + --- + -------- +- --------------- +
| A | B |日期| val |
+ --- + --- + -------- + ------------------ +
| hh | jj | 20171105 | 104.99999999999997 |
| rr | gg | 20171105 | 47.999999999999986 |
+ --- + --- + -------- + ------------------ +
,但四舍五入后对大多数应用程序来说应该足够好。
result.withColumn( val,round($ val))。show
< pre class = lang-none prettyprint-override> + --- + --- + -------- + ----- +
| A | B |日期| val |
+ --- + --- + -------- + ----- +
| hh | jj | 20171105 | 105.0 |
| rr | gg | 20171105 | 48.0 |
+ --- + --- + -------- + ----- +
如果这还不够,您可以定义 UserDefinedAggregateFunction
或 Aggregator
(如何在Spark SQL中定义和使用用户定义的聚合函数?)或将函数API与 reduceGroups $ c $一起使用c>:
import scala.math.Ordering
case class Record(A:String,B :字符串,日期:字符串,值:长整数)
df.withColumnRenamed( val, value)。as [Record]
.groupByKey(x =>(xA, xB))
.reduceGroups((x,y)=> x.copy(
date = Ordering [String] .max(x.date,y.date),
value = x.value * y.value))
.toDF( key, value)
.select($$ value。*)
.show
+ --- + --- +- ------ + ----- +
| A | B |日期|值|
+ --- + --- + -------- + ----- +
| hh | jj | 20171105 | 105 |
| rr | gg | 20171105 | 48 |
+ --- + --- + -------- + ----- +
I try to implement a Cumululative product in Spark scala but I really don't know how to it. I have the following dataframe:
Input data:
+--+--+--------+----+
|A |B | date | val|
+--+--+--------+----+
|rr|gg|20171103| 2 |
|hh|jj|20171103| 3 |
|rr|gg|20171104| 4 |
|hh|jj|20171104| 5 |
|rr|gg|20171105| 6 |
|hh|jj|20171105| 7 |
+-------+------+----+
And I would like to have the following output
Output data:
+--+--+--------+-----+
|A |B | date | val |
+--+--+--------+-----+
|rr|gg|20171105| 48 | // 2 * 4 * 6
|hh|jj|20171105| 105 | // 3 * 5 * 7
+-------+------+-----+
If you have any idea about how to do it, it would be really helpful :)
Thank a lot
As long as the number are strictly positive (0 can be handled as well, if present, using coalesce
) as in your example, the simplest solution is to compute the sum of logarithms and take the exponential:
import org.apache.spark.sql.functions.{exp, log, max, sum}
val df = Seq(
("rr", "gg", "20171103", 2), ("hh", "jj", "20171103", 3),
("rr", "gg", "20171104", 4), ("hh", "jj", "20171104", 5),
("rr", "gg", "20171105", 6), ("hh", "jj", "20171105", 7)
).toDF("A", "B", "date", "val")
val result = df
.groupBy("A", "B")
.agg(
max($"date").as("date"),
exp(sum(log($"val"))).as("val"))
Since this uses FP arithmetic the result won't be exact:
result.show
+---+---+--------+------------------+
| A| B| date| val|
+---+---+--------+------------------+
| hh| jj|20171105|104.99999999999997|
| rr| gg|20171105|47.999999999999986|
+---+---+--------+------------------+
but after rounding should good enough for majority of applications.
result.withColumn("val", round($"val")).show
+---+---+--------+-----+
| A| B| date| val|
+---+---+--------+-----+
| hh| jj|20171105|105.0|
| rr| gg|20171105| 48.0|
+---+---+--------+-----+
If that's not enough you can define an UserDefinedAggregateFunction
or Aggregator
(How to define and use a User-Defined Aggregate Function in Spark SQL?) or use functional API with reduceGroups
:
import scala.math.Ordering
case class Record(A: String, B: String, date: String, value: Long)
df.withColumnRenamed("val", "value").as[Record]
.groupByKey(x => (x.A, x.B))
.reduceGroups((x, y) => x.copy(
date = Ordering[String].max(x.date, y.date),
value = x.value * y.value))
.toDF("key", "value")
.select($"value.*")
.show
+---+---+--------+-----+
| A| B| date|value|
+---+---+--------+-----+
| hh| jj|20171105| 105|
| rr| gg|20171105| 48|
+---+---+--------+-----+
这篇关于Spark中的累积产品?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!