使用Scala在Spark数据框中连续计数值 [英] Count of values in a row in spark dataframe using scala
问题描述
我有一个数据框.它包含跨不同销售网点的不同项目的销售量.下面显示的数据框仅显示了少数销售网点中的少量商品.每个商品每天销售100件商品的基准.对于售出超过100件的每件商品,它都标记为是".而低于100的标记为否"
I have a dataframe. It contains the amount of sales for different items across different sales outlets. The dataframe shown below only shows few of the items across few sales outlets. There's a bench mark of 100 items per day sale for each item. For each item that's sold more than 100, it is marked as "Yes" and those below 100 is marked as "No"
val df1 = Seq(
("Mumbai", 90, 109, , 101, 78, ............., "No", "Yes", "Yes", "No", .....),
("Singapore", 149, 129, , 201, 107, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Hawaii", 127, 101, , 98, 109, ............., "Yes", "Yes", "No", "Yes", .....),
("New York", 146, 130, , 173, 117, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Los Angeles", 94, 99, , 95, 113, ............., "No", "No", "No", "Yes", .....),
("Dubai", 201, 229, , 265, 317, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Bangalore", 56, 89, , 61, 77, ............., "No", "No", "No", "No", .....))
.toDF("Outlet","Boys_Toys","Girls_Toys","Men_Shoes","Ladies_shoes", ............., "BT>100", "GT>100", "MS>100", "LS>100", .....)
现在,我想添加一列"Count_of_Yes"其中,对于每个销售网点(每行),"Count_of_Yes"列的值将为是"的总数.在那一行.如何遍历每一行以获得是"计数?
Now,I want to add a column "Count_of_Yes" in which for each sales outlets (each row), the value of the column "Count_of_Yes" will be the total number of "Yes" in that row. How do I iterate over each row to get the count of Yes?
我期望的数据框应该是
val output_df = Seq(
("Mumbai", 90, 109, , 101, 78, ............., "No", "Yes", "Yes", "No", ....., 2),
("Singapore", 149, 129, , 201, 107, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Hawaii", 127, 101, , 98, 109, ............., "Yes", "Yes", "No", "Yes", ....., 3),
("New York", 146, 130, , 173, 117, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Los Angeles", 94, 99, , 95, 113, ............., "No", "No", "No", "Yes", ....., 1),
("Dubai", 201, 229, , 265, 317, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Bangalore", 56, 89, , 61, 77, ............., "No", "No", "No", "No", ....., 0))
.toDF("Outlet","Boys_Toys","Girls_Toys","Men_Shoes","Ladies_shoes", ............., "BT>100", "GT>100", "MS>100", "LS>100", ....., "Count_of_Yes")
推荐答案
您可以将选定的列列表转换为 1
s的 Array
(对于是";)和 0
s(表示否"),并使用 selectExpr
在SQL表达式中使用 aggregate
对数组元素求和,如下所示:
You can convert the selected list of columns into an Array
of 1
s (for "yes") and 0
s (for "no") and sum the array elements with aggregate
in SQL expression using selectExpr
, as shown below:
val df = Seq(
(1, 120, 80, 150, "Y", "N", "Y"),
(2, 50, 90, 110, "N", "N", "Y"),
(3, 70, 160, 90, "N", "Y", "N")
).toDF("id", "qty_a", "qty_b", "qty_c", "over100_a", "over100_b", "over100_c")
val cols = df.columns.filter(_.startsWith("over100_"))
df.
withColumn("arr", array(cols.map(c => when(col(c) === "Y", 1).otherwise(0)): _*)).
selectExpr("*", "aggregate(arr, 0, (acc, x) -> acc + x) as yes_count").
show
// +---+-----+-----+-----+---------+---------+---------+---------+---------+
// | id|qty_a|qty_b|qty_c|over100_a|over100_b|over100_c| arr|yes_count|
// +---+-----+-----+-----+---------+---------+---------+---------+---------+
// | 1| 120| 80| 150| Y| N| Y|[1, 0, 1]| 2|
// | 2| 50| 90| 110| N| N| Y|[0, 0, 1]| 1|
// | 3| 70| 160| 90| N| Y| N|[0, 1, 0]| 1|
// +---+-----+-----+-----+---------+---------+---------+---------+---------+
或者,使用 explode
和 groupBy/agg
对 Array
元素求和:
df.
withColumn("arr", array(cols.map(c => when(col(c) === "Y", 1).otherwise(0)): _*)).
withColumn("flattened", explode($"arr")).
groupBy("id").agg(sum($"flattened").as("yes_count"))
这篇关于使用Scala在Spark数据框中连续计数值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!