如何在JSON中使用read.schema仅指定特定字段:SPARK Scala [英] How to specify only particular fields using read.schema in JSON : SPARK Scala

查看:596
本文介绍了如何在JSON中使用read.schema仅指定特定字段:SPARK Scala的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以编程方式对类似于json的textFile实施schema(json).我尝试使用jsonFile,但问题是要根据json文件列表创建数据框,spark必须对数据进行1遍操作才能为数据框创建架构.因此,它需要解析所有花费较长时间的数据(自压缩数据以来,这需要4个小时,并且大小为TB).因此,我想尝试将其读取为textFile并强制执行架构以单独获取感兴趣的字段,以便以后在生成的数据帧上进行查询.但是我不确定如何将其映射到输入.可以给我一些有关如何将架构映射到json之类的参考吗?

I am trying to programmatically enforce schema(json) on textFile which looks like json. I tried with jsonFile but the issue is for creating a dataframe from a list of json files, spark has to do a 1 pass through the data to create a schema for the dataframe. So it needs to parse all the data which is taking longer time (4 hours since my data is zipped and of size TBs). So I want to try reading it as textFile and enforce schema to get interested fields alone to later query on the resulting data frame. But I am not sure how do I map it to the input. Can some give me some reference on how do I map schema to json like input.

输入:

这是完整的架构:

records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>] 

但是我只对以下几个领域感兴趣:

But I am only interested in few fields like :

res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]


 val  records = sc.textFile("s3://testData/sample.json.gz")

  val schema = StructType(Array(StructField("requestId",StringType,true),
                          StructField("siteName",StringType,true),
                          StructField("model",StringType,true),
                          StructField("pageType",StringType,true),
                          StructField("inputs", ArrayType(
                                StructType(
                                            StructField("inputType",StringType,true), 
                                            StructField("originalRating",LongType,true), 
                                            StructField("processed",BooleanType,true), 
                                            StructField("rating",LongType,true), 
                                            StructField("methodId",StringType,true)
                                            ),true),true)))

    val rowRDD = ?? 

    val inputRDD = sqlContext.applySchema(rowRDD, schema)
    inputRDD.registerTempTable("input")

     sql("select * from input").foreach(println)

有什么办法可以映射吗?还是我需要使用子解析器之类的东西.我只想因为约束而使用textFile.

Is there any way to map this ? Or do I need to use son parser or something. I want to use textFile only because of the constraints.

尝试过:

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

但是不断出现错误:

<console>:37: error: overloaded method value apply with alternatives:
     (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
     cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("inputs",ArrayType(StructType(StructField("inputType",StringType,true), StructField("originalRating",LongType,true), StructField("processed",BooleanType,true), StructField("rating",LongType,true), StructField("score",DoubleType,true), StructField("methodId",StringType,true)),true),true)))
                                              ^

推荐答案

它可以使用预定义架构的以下代码加载,无需在ZIP文件中浏览文件.问题中的代码有歧义.

It can load with following code with predefined schema, spark don't need to go through the file in ZIP file. The code in the question has ambiguity.

import org.apache.spark.sql.types._

val input = StructType(
                Array(
                    StructField("inputType",StringType,true), 
                    StructField("originalRating",LongType,true), 
                    StructField("processed",BooleanType,true), 
                    StructField("rating",LongType,true), 
                    StructField("score",DoubleType,true), 
                    StructField("methodId",StringType,true)
                )
            )

 val schema = StructType(Array(
    StructField("requestId",StringType,true),
    StructField("siteName",StringType,true),
    StructField("model",StringType,true),
    StructField("inputs",
        ArrayType(input,true),
                true)
    )
)

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

并非所有字段都需要提供.尽一切可能提供所有内容都是很好的.

Not all the fields need to be provided. While it's good to provide all if possible.

如果某行无效,则请尽量解析所有内容.它将_corrupt_record添加为包含整个行的列. 而如果是平原的json文件文件.

Spark try best to parse all, if some row is not valid. It will add _corrupt_record as a column which contains the whole row. While if it's plained json file file.

这篇关于如何在JSON中使用read.schema仅指定特定字段:SPARK Scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆