我如何使用SQL星火/数据帧/ UDF比较表的多行 [英] How do I compare multiple rows of a table using Spark SQL / Data Frames / UDF
问题描述
问题:
我需要比较的 I [日] _row
与的[I-1] th_row
COL2
(由 COL1
)。如果 item_i
的的我[日] _row
和 ITEM_ [I-1] _row
不同,那么我需要1递增 ITEM_ [I-1]
的计数。
I need to compare the the i[th]_row
with the [i-1]th_row
of col2
(sorted by col1
). If item_i
of the i[th]_row
and the item_[i-1]_row
are different then I need to increment the count of item_[i-1]
by 1.
+--------------+
| col1 col2 |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+
在上面的例子中,如果我们同时扫描两行向下,我们看到 row_2
和 row_3
不同因此,我们添加一个ITEM_1。接下来,我们看到 row_3
不同于 row_4
,然后添加一个 ITEM_2
。继续下去,直到我们结束了:
In the above example, if we scan two rows at a time downwards, we see that row_2
and row_3
are different therefore we add one to item_1. Next, we see that row_3
is different from row_4
, then add one to item_2
. Continue until we end up with:
+-------------+
| col2 col3 |
+-------------+
| item_1 2 |
| item_2 2 |
+-------------+
使用Scala和RDD操作的溶液也是可以接受的。谢谢,任何提示,也AP preciated。
A solution using Scala and RDD operations is also acceptable. Thanks, any tips are also appreciated.
推荐答案
您可以使用一个窗口功能和聚集的组合来做到这一点。窗口函数用来获得的COL2
下一个值(使用 COL1
订货)。骨料然后计算我们遇到一个不同的时代。这在下面的code实现的:
You can use a combination of a window function and an aggregate to do this. The window function is used to get the next value of col2
(using col1
for ordering). The aggregate then counts the times we encounter a differences. This is implemented in the code below:
import org.apache.spark.sql.expressions.Window
val data = Seq(
("row_1", "item_1"),
("row_2", "item_1"),
("row_3", "item_2"),
("row_4", "item_1"),
("row_5", "item_2"),
("row_6", "item_1")).
toDF("col1", "col2")
data.
select(
$"*",
coalesce(lead($"col2", 1).over(Window.orderBy($"col1")), $"col2").as("col2_next")
).
groupBy($"col2").
agg(
sum(($"col2" !== $"col2_next").cast("int")).as("col3")
).
show
这篇关于我如何使用SQL星火/数据帧/ UDF比较表的多行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!