Spark 1.6 SQL或Dataframe或Windows [英] Spark 1.6 SQL or Dataframe or Windows

查看:81
本文介绍了Spark 1.6 SQL或Dataframe或Windows的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个工单数据转储,如下所示.我需要确定所有状态均为进行中"和完成"的订单.

此外,仅在进行中"状态为完成/无效"状态时才需要显示显示.我在下面提到的输出.在Spark中,我可以遵循的最佳方法是什么?输入和输出附在这里.

输入

Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

输出

Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

解决方案

您可以将groupBycollect_list结合使用,以收集每个Work_Req_Id的状态列表,以及UDF来过滤所需的状态.然后,将分组的数据框与原始数据框合并.

鉴于Spark 1.6似乎在窗口操作中不支持collect_list/collect_set,因此此处未提出窗口功能.

 val df = Seq(
  ("R1", "John", "3/4/15", "In Progress"),
  ("R1", "George", "3/5/15", "In Progress"),
  ("R2", "Peter", "3/6/15", "In Progress"),
  ("R3", "Alaxender", "3/7/15", "Finished"),
  ("R3", "Alaxender", "3/8/15", "In Progress"),
  ("R4", "Patrick", "3/9/15", "Finished"),
  ("R4", "Patrick", "3/10/15", "Not Valid"),
  ("R5", "Peter", "3/11/15", "Finished"),
  ("R6", "", "3/12/15", "Not Valid"),
  ("R7", "George", "3/13/15", "Not Valid"),
  ("R7", "George", "3/14/15", "In Progress"),
  ("R8", "John", "3/15/15", "Finished"),
  ("R8", "John", "3/16/15", "Failed"),
  ("R9", "Alaxender", "3/17/15", "Finished"),
  ("R9", "John", "3/18/15", "Removed"),
  ("R10", "Patrick", "3/19/15", "In Progress"),
  ("R10", "Patrick", "3/20/15", "Finished"),
  ("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")

def wanted = udf(
  (statuses: Seq[String]) => statuses.contains("In Progress") &&
    (statuses.contains("Finished") || statuses.contains("Not Valid"))
)

val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
  where( wanted($"Statuses") ).
  drop($"Statuses")

df.join(df2, Seq("Work_Req_Id")).show

// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To|   Date|     Status|
// +-----------+-----------+-------+-----------+
// |         R3|  Alaxender| 3/7/15|   Finished|
// |         R3|  Alaxender| 3/8/15|In Progress|
// |         R7|     George|3/13/15|  Not Valid|
// |         R7|     George|3/14/15|In Progress|
// |        R10|    Patrick|3/19/15|In Progress|
// |        R10|    Patrick|3/20/15|   Finished|
// |        R10|    Patrick|3/21/15|       Hold|
// +-----------+-----------+-------+-----------+
 

I have a data dump of Work orders as below. I need to identify the orders who are all having the status of both 'In Progress' and 'Finished'.

Also, need to display display only in case of 'In progress' status with 'Finished/Not Valid' status. The output I have mentioned below. What is the best approach I can follow for the same in Spark. The input and output are attached here.

Input

Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

Output

Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

解决方案

You can use groupBy with collect_list to collect the status list per Work_Req_Id along with a UDF to filter for the wanted statuses. The grouped dataframe is then joined with the original dataframe.

Window functions aren't being proposed here given that Spark 1.6 doesn't seem to support collect_list/collect_set in window operations.

val df = Seq(
  ("R1", "John", "3/4/15", "In Progress"),
  ("R1", "George", "3/5/15", "In Progress"),
  ("R2", "Peter", "3/6/15", "In Progress"),
  ("R3", "Alaxender", "3/7/15", "Finished"),
  ("R3", "Alaxender", "3/8/15", "In Progress"),
  ("R4", "Patrick", "3/9/15", "Finished"),
  ("R4", "Patrick", "3/10/15", "Not Valid"),
  ("R5", "Peter", "3/11/15", "Finished"),
  ("R6", "", "3/12/15", "Not Valid"),
  ("R7", "George", "3/13/15", "Not Valid"),
  ("R7", "George", "3/14/15", "In Progress"),
  ("R8", "John", "3/15/15", "Finished"),
  ("R8", "John", "3/16/15", "Failed"),
  ("R9", "Alaxender", "3/17/15", "Finished"),
  ("R9", "John", "3/18/15", "Removed"),
  ("R10", "Patrick", "3/19/15", "In Progress"),
  ("R10", "Patrick", "3/20/15", "Finished"),
  ("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")

def wanted = udf(
  (statuses: Seq[String]) => statuses.contains("In Progress") &&
    (statuses.contains("Finished") || statuses.contains("Not Valid"))
)

val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
  where( wanted($"Statuses") ).
  drop($"Statuses")

df.join(df2, Seq("Work_Req_Id")).show

// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To|   Date|     Status|
// +-----------+-----------+-------+-----------+
// |         R3|  Alaxender| 3/7/15|   Finished|
// |         R3|  Alaxender| 3/8/15|In Progress|
// |         R7|     George|3/13/15|  Not Valid|
// |         R7|     George|3/14/15|In Progress|
// |        R10|    Patrick|3/19/15|In Progress|
// |        R10|    Patrick|3/20/15|   Finished|
// |        R10|    Patrick|3/21/15|       Hold|
// +-----------+-----------+-------+-----------+

这篇关于Spark 1.6 SQL或Dataframe或Windows的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆