如何处理spark sql数据框中的json列数组 [英] How to Process array of json column in spark sql dataframe
问题描述
输入 Json
{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId |newSub |score|scoreBoard |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7 |A |abc@gmail.com|Environment|95 |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]] |abc |
|8 |A |xyz@gmail.com|Environment|95 |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]] |xyz |
|9 |A |efg@gmail.com|Environment|95 |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
我想要的处理 -
添加 newSub 的 json 是 scoreBoard 列表(从用户行读取数据——newSub、score、grade)
add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)
按分数对它们进行排序,并从分数板列表中删除分数较低的 json
sort them on score and remove the json from scoreBoard list having less score
预期输出 -
{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId |scoreBoard |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7 |abc@gmail.com|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc |
|8 |xyz@gmail.com|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]] |xyz |
|9 |efg@gmail.com|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
我试过了
第一种方式 - UDF 处理但从 UDF 的 scoreBoard 列中排序和删除 json 具有挑战性
1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging
第二种方式 - 炸开列记分板,为单个学生获得 6 行,每个针对每个科目.我在这方面面临的挑战是,如何明智地处理数据分组,例如如何为新主题添加新行,对每个用户的主题分数进行排序并删除一行.
2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.
需要帮助选择解决此问题的方法,如果有人知道有没有新的/不同的有效方法来进行相同的处理.谢谢!!
Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!
推荐答案
import ss.implicits._
val schema = new ArrayType(new StructType(Array(
StructField("grade",DataTypes.StringType,true),
StructField("score",DataTypes.LongType,true),
StructField("subject",DataTypes.StringType,true))),true)
def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)
def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)
val df2 = df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
.withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
df2.select("scoreBoard").show(false)
UDF 方法,其中 ss 是 SparkSession.如果使用 spark 2.4 及以上版本,addvalue 可以替换为 array_union.
UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.
以上代码适用于 spark 2.0 及更高版本
Above code will work for spark 2.0 and above
这篇关于如何处理spark sql数据框中的json列数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!