Spark 1.6 SQL或Dataframe或Windows [英] Spark 1.6 SQL or Dataframe or Windows
问题描述
我有一个工单数据转储,如下所示.我需要确定所有状态均为进行中"和完成"的订单.
此外,仅在进行中"状态为完成/无效"状态时才需要显示显示.我在下面提到的输出.在Spark中,我可以遵循的最佳方法是什么?输入和输出附在这里.
输入
Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
输出
Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
您可以将groupBy
与collect_list
结合使用,以收集每个Work_Req_Id
的状态列表,以及UDF
来过滤所需的状态.然后,将分组的数据框与原始数据框合并.
collect_list/collect_set
,因此此处未提出窗口功能. val df = Seq(
("R1", "John", "3/4/15", "In Progress"),
("R1", "George", "3/5/15", "In Progress"),
("R2", "Peter", "3/6/15", "In Progress"),
("R3", "Alaxender", "3/7/15", "Finished"),
("R3", "Alaxender", "3/8/15", "In Progress"),
("R4", "Patrick", "3/9/15", "Finished"),
("R4", "Patrick", "3/10/15", "Not Valid"),
("R5", "Peter", "3/11/15", "Finished"),
("R6", "", "3/12/15", "Not Valid"),
("R7", "George", "3/13/15", "Not Valid"),
("R7", "George", "3/14/15", "In Progress"),
("R8", "John", "3/15/15", "Finished"),
("R8", "John", "3/16/15", "Failed"),
("R9", "Alaxender", "3/17/15", "Finished"),
("R9", "John", "3/18/15", "Removed"),
("R10", "Patrick", "3/19/15", "In Progress"),
("R10", "Patrick", "3/20/15", "Finished"),
("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")
def wanted = udf(
(statuses: Seq[String]) => statuses.contains("In Progress") &&
(statuses.contains("Finished") || statuses.contains("Not Valid"))
)
val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
where( wanted($"Statuses") ).
drop($"Statuses")
df.join(df2, Seq("Work_Req_Id")).show
// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To| Date| Status|
// +-----------+-----------+-------+-----------+
// | R3| Alaxender| 3/7/15| Finished|
// | R3| Alaxender| 3/8/15|In Progress|
// | R7| George|3/13/15| Not Valid|
// | R7| George|3/14/15|In Progress|
// | R10| Patrick|3/19/15|In Progress|
// | R10| Patrick|3/20/15| Finished|
// | R10| Patrick|3/21/15| Hold|
// +-----------+-----------+-------+-----------+
I have a data dump of Work orders as below. I need to identify the orders who are all having the status of both 'In Progress' and 'Finished'.
Also, need to display display only in case of 'In progress' status with 'Finished/Not Valid' status. The output I have mentioned below. What is the best approach I can follow for the same in Spark. The input and output are attached here.
Input
Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
Output
Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold
You can use groupBy
with collect_list
to collect the status list per Work_Req_Id
along with a UDF
to filter for the wanted statuses. The grouped dataframe is then joined with the original dataframe.
Window functions aren't being proposed here given that Spark 1.6 doesn't seem to support collect_list/collect_set
in window operations.
val df = Seq(
("R1", "John", "3/4/15", "In Progress"),
("R1", "George", "3/5/15", "In Progress"),
("R2", "Peter", "3/6/15", "In Progress"),
("R3", "Alaxender", "3/7/15", "Finished"),
("R3", "Alaxender", "3/8/15", "In Progress"),
("R4", "Patrick", "3/9/15", "Finished"),
("R4", "Patrick", "3/10/15", "Not Valid"),
("R5", "Peter", "3/11/15", "Finished"),
("R6", "", "3/12/15", "Not Valid"),
("R7", "George", "3/13/15", "Not Valid"),
("R7", "George", "3/14/15", "In Progress"),
("R8", "John", "3/15/15", "Finished"),
("R8", "John", "3/16/15", "Failed"),
("R9", "Alaxender", "3/17/15", "Finished"),
("R9", "John", "3/18/15", "Removed"),
("R10", "Patrick", "3/19/15", "In Progress"),
("R10", "Patrick", "3/20/15", "Finished"),
("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")
def wanted = udf(
(statuses: Seq[String]) => statuses.contains("In Progress") &&
(statuses.contains("Finished") || statuses.contains("Not Valid"))
)
val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
where( wanted($"Statuses") ).
drop($"Statuses")
df.join(df2, Seq("Work_Req_Id")).show
// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To| Date| Status|
// +-----------+-----------+-------+-----------+
// | R3| Alaxender| 3/7/15| Finished|
// | R3| Alaxender| 3/8/15|In Progress|
// | R7| George|3/13/15| Not Valid|
// | R7| George|3/14/15|In Progress|
// | R10| Patrick|3/19/15|In Progress|
// | R10| Patrick|3/20/15| Finished|
// | R10| Patrick|3/21/15| Hold|
// +-----------+-----------+-------+-----------+
这篇关于Spark 1.6 SQL或Dataframe或Windows的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!