如何处理spark sql数据框中的json列数组 [英] How to Process array of json column in spark sql dataframe

查看:40
本文介绍了如何处理spark sql数据框中的json列数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

输入 Json

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId       |newSub     |score|scoreBoard                                                                                      |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7    |A    |abc@gmail.com|Environment|95   |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]]   |abc        |
|8    |A    |xyz@gmail.com|Environment|95   |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]]   |xyz        |
|9    |A    |efg@gmail.com|Environment|95   |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg        |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+

我想要的处理 -

  1. 添加 newSub 的 json 是 scoreBoard 列表(从用户行读取数据——newSub、score、grade)

  1. add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)

按分数对它们进行排序,并从分数板列表中删除分数较低的 json

sort them on score and remove the json from scoreBoard list having less score

预期输出 -

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId       |scoreBoard                                                                                         |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7    |abc@gmail.com|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc        |
|8    |xyz@gmail.com|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]]  |xyz        |
|9    |efg@gmail.com|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg        |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+

我试过了

第一种方式 - UDF 处理但从 UDF 的 scoreBoard 列中排序和删除 json 具有挑战性

1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging

第二种方式 - 炸开列记分板,为单个学生获得 6 行,每个针对每个科目.我在这方面面临的挑战是,如何明智地处理数据分组,例如如何为新主题添加新行,对每个用户的主题分数进行排序并删除一行.

2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.

需要帮助选择解决此问题的方法,如果有人知道有没有新的/不同的有效方法来进行相同的处理.谢谢!!

Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!

推荐答案

 import ss.implicits._

  val schema = new ArrayType(new StructType(Array(
    StructField("grade",DataTypes.StringType,true),
    StructField("score",DataTypes.LongType,true),
    StructField("subject",DataTypes.StringType,true))),true)

  def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)

  def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)

val df2 =  df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
    .withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
  df2.select("scoreBoard").show(false)

UDF 方法,其中 ss 是 SparkSession.如果使用 spark 2.4 及以上版本,addvalue 可以替换为 array_union.

UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.

以上代码适用于 spark 2.0 及更高版本

Above code will work for spark 2.0 and above

这篇关于如何处理spark sql数据框中的json列数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆