Spark 1.6 SQL 或 Dataframe 或 Windows [英] Spark 1.6 SQL or Dataframe or Windows

查看:31
本文介绍了Spark 1.6 SQL 或 Dataframe 或 Windows的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个工作订单的数据转储,如下所示.我需要确定状态均为进行中"和已完成"的订单.

I have a data dump of Work orders as below. I need to identify the orders who are all having the status of both 'In Progress' and 'Finished'.

此外,仅在进行中"状态和已完成/无效"状态的情况下才需要显示.我在下面提到的输出.我可以在 Spark 中遵循的最佳方法是什么.输入和输出附在这里.

Also, need to display display only in case of 'In progress' status with 'Finished/Not Valid' status. The output I have mentioned below. What is the best approach I can follow for the same in Spark. The input and output are attached here.

输入

Work_ Req_Id,Assigned to,Date,Status
R1,John,3/4/15,In Progress
R1,George,3/5/15,In Progress
R2,Peter,3/6/15,In Progress
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R4,Patrick,3/9/15,Finished
R4,Patrick,3/10/15,Not Valid
R5,Peter,3/11/15,Finished
R6,,3/12/15,Not Valid
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R8,John,3/15/15,Finished
R8,John,3/16/15,Failed
R9,Alaxender,3/17/15,Finished
R9,John,3/18/15,Removed
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

输出

Work_ Req_Id,Assigned to,Date,Status
R3,Alaxender,3/7/15,Finished
R3,Alaxender,3/8/15,In Progress
R7,George,3/13/15,Not Valid
R7,George,3/14/15,In Progress
R10,Patrick,3/19/15,In Progress
R10,Patrick,3/20/15,Finished
R10,Peter,3/21/15,Hold

推荐答案

您可以使用 groupBycollect_list 来收集每个 Work_Req_Id 的状态列表> 和 UDF 一起过滤想要的状态.然后将分组的数据框与原始数据框连接起来.

You can use groupBy with collect_list to collect the status list per Work_Req_Id along with a UDF to filter for the wanted statuses. The grouped dataframe is then joined with the original dataframe.

鉴于 Spark 1.6 似乎不支持窗口操作中的 collect_list/collect_set,这里没有提出窗口函数.

Window functions aren't being proposed here given that Spark 1.6 doesn't seem to support collect_list/collect_set in window operations.

val df = Seq(
  ("R1", "John", "3/4/15", "In Progress"),
  ("R1", "George", "3/5/15", "In Progress"),
  ("R2", "Peter", "3/6/15", "In Progress"),
  ("R3", "Alaxender", "3/7/15", "Finished"),
  ("R3", "Alaxender", "3/8/15", "In Progress"),
  ("R4", "Patrick", "3/9/15", "Finished"),
  ("R4", "Patrick", "3/10/15", "Not Valid"),
  ("R5", "Peter", "3/11/15", "Finished"),
  ("R6", "", "3/12/15", "Not Valid"),
  ("R7", "George", "3/13/15", "Not Valid"),
  ("R7", "George", "3/14/15", "In Progress"),
  ("R8", "John", "3/15/15", "Finished"),
  ("R8", "John", "3/16/15", "Failed"),
  ("R9", "Alaxender", "3/17/15", "Finished"),
  ("R9", "John", "3/18/15", "Removed"),
  ("R10", "Patrick", "3/19/15", "In Progress"),
  ("R10", "Patrick", "3/20/15", "Finished"),
  ("R10", "Patrick", "3/21/15", "Hold")
).toDF("Work_Req_Id", "Assigned_To", "Date", "Status")

def wanted = udf(
  (statuses: Seq[String]) => statuses.contains("In Progress") &&
    (statuses.contains("Finished") || statuses.contains("Not Valid"))
)

val df2 = df.groupBy($"Work_Req_Id").agg(collect_list($"Status").as("Statuses")).
  where( wanted($"Statuses") ).
  drop($"Statuses")

df.join(df2, Seq("Work_Req_Id")).show

// +-----------+-----------+-------+-----------+
// |Work_Req_Id|Assigned_To|   Date|     Status|
// +-----------+-----------+-------+-----------+
// |         R3|  Alaxender| 3/7/15|   Finished|
// |         R3|  Alaxender| 3/8/15|In Progress|
// |         R7|     George|3/13/15|  Not Valid|
// |         R7|     George|3/14/15|In Progress|
// |        R10|    Patrick|3/19/15|In Progress|
// |        R10|    Patrick|3/20/15|   Finished|
// |        R10|    Patrick|3/21/15|       Hold|
// +-----------+-----------+-------+-----------+

这篇关于Spark 1.6 SQL 或 Dataframe 或 Windows的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆