Spark SQL无法完成大量分片的Parquet数据写入 [英] Spark SQL unable to complete writing Parquet data with a large number of shards

查看:59
本文介绍了Spark SQL无法完成大量分片的Parquet数据写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Apache Spark SQL 将 S3 中的 json 日志数据也写入 S3 上的 Parquet 文件中.我的代码基本上是:

I am trying to use Apache Spark SQL to etl json log data in S3 into Parquet files also on S3. My code is basically:

import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")

当我有多达 2000 个分区并且失败 5000 或更多时,无论数据量如何,此代码都有效.通常可以将分区合并为可接受的数量,但这是一个非常大的数据集,在 2000 个分区时,我遇到了此 问题

This code works when I have up to 2000 partitions and fails for 5000 or more, regardless of the volume of data. Normally one could just coalesce the partitions to an acceptable number, but this is a very large data set and at 2000 partitions I hit the problem describe in this question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
        at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
        at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
        at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
        at $line37.$read$$iwC$$iwC.<init>(<console>:58)
        at $line37.$read$$iwC.<init>(<console>:60)
        at $line37.$read.<init>(<console>:62)
        at $line37.$read$.<init>(<console>:66)
        at $line37.$read$.<clinit>(<console>)
        at $line37.$eval$.<init>(<console>:7)
        at $line37.$eval$.<clinit>(<console>)
        at $line37.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
        at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

我正在 ec2 中的 R3.xlarge 上的 spark-1.1.0 上运行它.我正在使用 spark-shell 控制台来运行上面的代码.之后我可以对 data SchemaRDD 对象执行非平凡的查询,因此它似乎不是资源问题.也可以读取和查询生成的 Parquet 文件,只是由于缺少摘要文件需要很长时间.

I am running this on spark-1.1.0 on an an R3.xlarge in ec2. I am using the spark-shell console to run the above code. I am able to perform non trivial queries on the data SchemaRDD object afterwards, so it does not appear to be a resource issue. It is also possible to read and query the resulting Parquet file, it just takes an extremely long in due to the lack of summary files.

推荐答案

尝试将此属性设置为 false :

Try to set this property as false :

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");

这篇关于Spark SQL无法完成大量分片的Parquet数据写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆