Spark:如何使用 Struct 数组列表解析多个 json? [英] Spark: How to parse multiple json with List of arrays of Struct?

查看:27
本文介绍了Spark:如何使用 Struct 数组列表解析多个 json?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试获取文件中所有 JSON 对象的平均评分.我加载了文件并转换为数据帧,但在解析 avg 时出错.样品请求:

I am trying to get avg of ratings of all JSON objects in a file. I loaded the file and converted to data frame but getting error while parsing for avg. Sample Request :

{
        "country": "France",
        "customerId": "France001",
        "visited": [
            {
                "placeName": "US",
                "rating": "2.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"

            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

            },
              {
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"

            }        
    ]
}

因此对于此 JSON,美国平均评分将为 (2.3 + 3.3)/2 = 2.8

so for this JSON, US avg rating will be (2.3 + 3.3)/2 = 2.8

{
        "country": "Egypt",
        "customerId": "Egypt009",
        "visited": [
            {
                "placeName": "US",
                "rating": "1.3",
                "famousRest": "McDonald",
                "placeId": "Dedcf3"

            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "EagleNest",
                "placeId": "CDfet3"

            },


}

{
        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            {
                "placeName": "UK",
                "rating": "3.3",
                "famousRest": "N/A",
                "placeId": "XSdce2"

            },


    ]
}

对于我们的这个平均值= (3.3 +1.3)/2 = 2.3

for this avg for us= (3.3 +1.3)/2 = 2.3

所以总的来说,平均评分将是:(2.8 + 2.3)/2 = 2.55(只有两个请求在其访问列表中包含US")

so over all, the average rating will be : (2.8 + 2.3)/2 = 2.55 (only two requests have 'US' in their visited list)

我的架构:

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show() 

所以基本上我需要获得评分的平均值,其中 placeName = 'US' 例如.AVG_RATING = 每个 JSON 对象中的评分总和,其中 placeName 是 US/此类访问条目的计数和 FINAL_VALUE = 每个具有 placeName 'US' 的 JSON 对象中所有 AVG_RATING 的总和/placeName = 'US' 的所有 JSON 对象的计数.

so basically I need to get average of ratings where placeName = 'US' in say for eg. AVG_RATING = sum of rating in each JSON object where placeName is US / count of such visited entry and FINAL_VALUE = Sum of all AVG_RATING in each JSON object with placeName 'US' / count of all JSON objects with placeName = 'US' .

到目前为止我尝试过:

 df.registerTempTable("people")
   sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println)

    val result = df.select("*").where(array_contains (df("visited.placeName"), "US"));  - gives the list where visited array contains US. But I am not sure how do parse through list of structs.

有人可以告诉我该怎么做吗?

Can some one tell me how do I do this ?

推荐答案

看起来你想要这样的:

import org.apache.spark.sql.functions.{avg, explode}

val result = df
  .withColumn("visit", explode($"visited"))    // Explode visits
  .groupBy($"customerId", $"visit.placeName")  // Group by using dot syntax
  .agg(avg($"visit.rating".cast("double")).alias("tmp"))
  .groupBy($"placeName").agg(avg($"tmp").alias("value"))

之后,您可以针对您选择的国家/地区进行过滤.

After that you can filter this for a country of your choice.

result.where($"placeName" === "US").show
// +---------+-----+
// |placeName|value|
// +---------+-----+
// |       US| 2.55|
// +---------+-----+

不太优雅的方法是使用 UDF:

Less elegant approach is to use an UDF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

def userAverage(country: String) = udf((visits: Seq[Row]) => Try {
   val filtered = visits
     .filter(_.getAs[String]("placeName") == country)
     .map(_.getAs[String]("rating").toDouble)
   filtered.sum / filtered.size
}.toOption)

df.select(userAverage("US")($"visited").as("tmp")).na.drop.agg(avg("tmp"))

注意:这遵循问题中提供的描述,通过计算与接受的答案不同的平均值的平均值.对于简单平均:

Note: This follows the decription provided in the question by computing average of averages which is different from the accepted answer. For simple average:

val result = df
  .select(explode($"visited").alias("visit"))
  .groupBy($"visit.placeName")
  .agg(avg($"visit.rating".cast("double")))

这篇关于Spark:如何使用 Struct 数组列表解析多个 json?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆