Spark SQL read.json 读取 JSON 输入两次 [英] Spark SQL read.json reads JSON input twice

查看:33
本文介绍了Spark SQL read.json 读取 JSON 输入两次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Spark SQL,可以将我的 S3 JSON 文件读入 DataFrame.

I have a Spark SQL that read my S3 JSON file(s) into a DataFrame.

然后我在该 DataFrame 上运行 2 个 SQL,发现 SparkSQL 在执行我的每个 SQL 之前读取了我的 S3 JSON 文件两次.

I then run 2 SQL on that DataFrame and found SparkSQL read my S3 JSON files twice before executing each of my SQL.

如果 DataFrame 对象没有被重用,那将是非常昂贵的...

If the DataFrame object is not being reused, it would be very costly...

感谢任何帮助.

这是我的代码片段:

protected boolean doAggregations() throws IOException { 

    SQLContext sqlContext = getSQLContext(); 

    DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory); 


    edgeDataFrame.cache(); 

    getLogger().info("Registering and caching the table 'edgeData'"); 
    edgeDataFrame.registerTempTable("edgeData"); 

    String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis()); 

    for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) { 
        String aggName = entry.getKey(); 
        String resultDir = getAggregationResultDirectory(aggName, dateKey); 
        String sql = entry.getValue().getSql(); 
        // The input file(s) are being read again and again instead of operating on the "edgeDataFrame" 
        DataFrame dataFrame = sqlContext.sql(sql); 
        dataFrame.write().format("json").save(resultDir); 
    } 
    return true; 
}

推荐答案

您的 JSON 文件被读取了两次,因为 Spark 不知道 JSON 的架构,而 SQL 需要一个已知的架构.因此,Spark 采用了两遍的方法:

Your JSON files were read twice because Spark did not know the schema of the JSON and SQL requires a known schema. Therefore, Spark took a two-pass approach:

  1. 发现所有 JSON 记录的模式作为每个 JSON 记录模式的联合.

  1. Discover the schema of all JSON records as the union of the schemas of each JSON record.

将数据加载到适当配置的数据结构中.

Load the data into an appropriately-configured data structure.

假设您有一个简单的单行 JSON 文件:

Imagine you have the simple, one-line JSON file:

{"category" : "A", "num" : 5}

如果你执行

sqlContext.read.json(path).saveAsTable("test")

spark-shell 中,您会注意到两次传递.

in spark-shell you'll notice the two passes.

第一阶段有一个映射阶段,它收集每个分区发现的模式,减少阶段将模式组合到所有分区的联合模式中.

The first pass has a map phase that collects the discovered schema per partition and the reduce phase combines the schema into the union schema for all partitions.

对于地图阶段,您将看到如下内容:

For the map phase, you'll see something like:

INFO DAGScheduler: Submitting Stage 11 (file:///home/ubuntu/test_data.jsonlines MapPartitionsRDD[28] at textFile at JSONRelation.scala:114), which has no missing parents

对于reduce 阶段,您将看到如下内容:

For the reduce phase you'll see something like:

INFO SparkContext: Starting job: reduce at JsonRDD.scala:54

之后,当架构已知时,将开始实际加载 JSON 数据.这将只涉及映射阶段,因为一旦发现模式,就不需要在分区处理器之间共享信息.

After that, when the schema is known, the actual loading of the JSON data will begin. This will only involve a map phase because there is no need to share information between partition processors once the schema is discovered.

您可以看到 Spark 如何处理日志中的数据列:

You can see how Spark treats the data columns in the logs:

INFO ColumnChunkPageWriteStore: written 56B for [category] BINARY: 1 values, 11B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]
INFO ColumnChunkPageWriteStore: written 70B for [num] INT64: 1 values, 14B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]

这篇关于Spark SQL read.json 读取 JSON 输入两次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆