如何处理Spark SQL DataFrame中的JSON列数组 [英] How to Process array of json column in spark sql dataframe
问题描述
输入Json
{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId |newSub |score|scoreBoard |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7 |A |abc@gmail.com|Environment|95 |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]] |abc |
|8 |A |xyz@gmail.com|Environment|95 |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]] |xyz |
|9 |A |efg@gmail.com|Environment|95 |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
我想要的处理-
-
添加newSub的json是scoreBoard列表(从用户行读取数据-newSub,成绩,等级)
add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)
根据分数对它们进行排序,并从分数更少的scoreBoard列表中删除json
sort them on score and remove the json from scoreBoard list having less score
预期输出-
{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId |scoreBoard |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7 |abc@gmail.com|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc |
|8 |xyz@gmail.com|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]] |xyz |
|9 |efg@gmail.com|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
我尝试了
第一种方法-UDF处理,但是从UDF中的scoreBoard列中排序和删除json很有挑战性
1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging
第二种方法-展开列评分板,单身学生获得6行,每门学科各获得6行.我面临的挑战是如何明智地处理数据组,就像如何为新主题添加新行,对每个用户的主题分数进行排序并删除一行.
2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.
如果有人知道有任何新的/不同的有效方法来执行相同的处理,则需要帮助选择解决此问题的方法.谢谢!
Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!
推荐答案
import ss.implicits._
val schema = new ArrayType(new StructType(Array(
StructField("grade",DataTypes.StringType,true),
StructField("score",DataTypes.LongType,true),
StructField("subject",DataTypes.StringType,true))),true)
def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)
def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)
val df2 = df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
.withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
df2.select("scoreBoard").show(false)
UDF方法,其中ss是SparkSession.如果使用Spark 2.4及更高版本,则可以用array_union替换addvalue.
UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.
以上代码适用于spark 2.0及更高版本
Above code will work for spark 2.0 and above
这篇关于如何处理Spark SQL DataFrame中的JSON列数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!