使用Spark展平滑动窗口中的行 [英] Flatten rows in sliding window using Spark
问题描述
我正在使用Apache Spark处理来自数据库或文件的大量行.处理的一部分将创建一个3行的滑动窗口,其中需要对行进行展平,并对展平的行执行其他计算.下面是尝试执行的简化示例.
I'm processing a large number of rows from either a database or a file using Apache Spark. Part of the processing creates a sliding window of 3 rows where the rows need to flattened and additional calculations performed on the flattened rows. Below is a simplified example of what is trying to be done.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions.Window
object Main extends App {
val ss = SparkSession.builder().appName("DataSet Test")
.master("local[*]").getOrCreate()
import ss.implicits._
case class Foo(a:Int, b:String )
// rows from database or file
val foos = Seq(Foo(-18, "Z"),
Foo(-11, "G"),
Foo(-8, "A"),
Foo(-4, "C"),
Foo(-1, "F")).toDS()
// work on 3 rows
val sliding_window_spec = Window.orderBy(desc("a")).rowsBetween( -2, 0)
// flattened object with example computations
case class FooResult(a1:Int, b1:String, a2:Int, b2:String, a3:Int, b3:String, computation1:Int, computation2:String )
// how to convert foo to fooResult???
// flatten 3 rows into 1 and do additional computations on flattened rows
// expected results
val fooResults = Seq(FooResult( -1, "F", -4, "C", -8, "A", -5, "FCA" ),
FooResult( -4, "C", -8, "A", -11, "G", -12, "CAG" ),
FooResult( -8, "A", -11, "G", -18, "Z", -19, "AGZ" )).toDS()
ss.stop()
}
如何将foos转换为fooResults?我正在使用Apache Spark 2.3.0
How can I convert the foos into the fooResults? I'm using Apache Spark 2.3.0
推荐答案
//如何将foo转换为fooResult ???//将3列展平为1,并对展平的行进行其他计算
您可以简单地使用 collect_list
内置函数,只需使用 window
函数您已经定义的,然后按定义 udf
函数,您可以执行计算部分和展平部分.最终,您可以 filter
和展开 struct
列以获得
You can simply use collect_list
inbuilt function using the window
function you've already defined and then by defining a udf
function, you can do the computation part and flattening part. finally you can filter
and expand the struct
column to get your final desired result as
def slidingUdf = udf((list1: Seq[Int], list2:Seq[String])=> {
if(list1.size < 3) null
else {
val zipped = list1.zip(list2)
FooResult(zipped(0)._1, zipped(0)._2, zipped(1)._1, zipped(1)._2, zipped(2)._1, zipped(2)._2, zipped(0)._1+zipped(1)._1, zipped(0)._2+zipped(1)._2+zipped(2)._2)
}
})
foos.select(slidingUdf(collect_list("a").over(sliding_window_spec), collect_list("b").over(sliding_window_spec)).as("test"))
.filter(col("test").isNotNull)
.select(col("test.*"))
.show(false)
应该给您
+---+---+---+---+---+---+------------+------------+
|a1 |b1 |a2 |b2 |a3 |b3 |computation1|computation2|
+---+---+---+---+---+---+------------+------------+
|-1 |F |-4 |C |-8 |A |-5 |FCA |
|-4 |C |-8 |A |-11|G |-12 |CAG |
|-8 |A |-11|G |-18|Z |-19 |AGZ |
+---+---+---+---+---+---+------------+------------+
注意:请记住,案例类应在当前会话的范围之外定义
这篇关于使用Spark展平滑动窗口中的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!