Spark SQL read.json两次读取JSON输入 [英] Spark SQL read.json reads JSON input twice

查看:122
本文介绍了Spark SQL read.json两次读取JSON输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark SQL,可将我的S3 JSON文件读入DataFrame.

I have a Spark SQL that read my S3 JSON file(s) into a DataFrame.

然后我在该DataFrame上运行2个SQL,发现SparkSQL在执行每个SQL之前两次读取了我的S3 JSON文件.

I then run 2 SQL on that DataFrame and found SparkSQL read my S3 JSON files twice before executing each of my SQL.

如果不重复使用DataFrame对象,那将是非常昂贵的...

If the DataFrame object is not being reused, it would be very costly...

感谢您的帮助.

这是我的代码片段:

protected boolean doAggregations() throws IOException { 

    SQLContext sqlContext = getSQLContext(); 

    DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory); 


    edgeDataFrame.cache(); 

    getLogger().info("Registering and caching the table 'edgeData'"); 
    edgeDataFrame.registerTempTable("edgeData"); 

    String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis()); 

    for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) { 
        String aggName = entry.getKey(); 
        String resultDir = getAggregationResultDirectory(aggName, dateKey); 
        String sql = entry.getValue().getSql(); 
        // The input file(s) are being read again and again instead of operating on the "edgeDataFrame" 
        DataFrame dataFrame = sqlContext.sql(sql); 
        dataFrame.write().format("json").save(resultDir); 
    } 
    return true; 
}

推荐答案

您的JSON文件被读取了两次,因为Spark不知道JSON的架构,而SQL需要已知的架构.因此,Spark采用了两种方法:

Your JSON files were read twice because Spark did not know the schema of the JSON and SQL requires a known schema. Therefore, Spark took a two-pass approach:

  1. 发现所有JSON记录的架构,作为每个JSON记录的架构的并集.

  1. Discover the schema of all JSON records as the union of the schemas of each JSON record.

将数据加载到适当配置的数据结构中.

Load the data into an appropriately-configured data structure.

想象一下,您有一个简单的单行JSON文件:

Imagine you have the simple, one-line JSON file:

{"category" : "A", "num" : 5}

如果执行

sqlContext.read.json(path).saveAsTable("test")

spark-shell中,您会注意到两次通过.

in spark-shell you'll notice the two passes.

第一遍有一个映射阶段,该阶段收集每个分区的发现模式,而reduce阶段将模式合并为所有分区的联合模式.

The first pass has a map phase that collects the discovered schema per partition and the reduce phase combines the schema into the union schema for all partitions.

在地图阶段,您会看到类似以下内容的

For the map phase, you'll see something like:

INFO DAGScheduler: Submitting Stage 11 (file:///home/ubuntu/test_data.jsonlines MapPartitionsRDD[28] at textFile at JSONRelation.scala:114), which has no missing parents

在减少阶段,您将看到类似以下内容的

For the reduce phase you'll see something like:

INFO SparkContext: Starting job: reduce at JsonRDD.scala:54

此后,当知道架构时,将开始实际加载JSON数据.这将只涉及映射阶段,因为一旦发现架构,就无需在分区处理器之间共享信息.

After that, when the schema is known, the actual loading of the JSON data will begin. This will only involve a map phase because there is no need to share information between partition processors once the schema is discovered.

您可以看到Spark如何处理日志中的数据列:

You can see how Spark treats the data columns in the logs:

INFO ColumnChunkPageWriteStore: written 56B for [category] BINARY: 1 values, 11B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]
INFO ColumnChunkPageWriteStore: written 70B for [num] INT64: 1 values, 14B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]

这篇关于Spark SQL read.json两次读取JSON输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆