如何在AWS上获取本地Spark以写入S3 [英] How To Get Local Spark on AWS to Write to S3

查看:725
本文介绍了如何在AWS上获取本地Spark以写入S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已在AWS EC2实例上安装了带有Hadoop 3.2的Spark 2.4.3.我一直在本地模式下使用spark(主要是pyspark)取得了很大的成功.能够旋转一些小东西然后在我需要电源时调整它的大小,并且很快地完成所有操作,这是很好的.当我确实需要扩展时,可以切换到EMR并享用午餐.除了一个问题外,所有工作都顺利进行:我无法获得本地火花来可靠地写入S3(我一直在使用本地EBS空间).显然,这与文档中概述的有关S3作为文件系统的限制的所有问题有关.但是,使用最新的hadoop,我对文档的理解是应该能够使其正常工作.

I have installed Spark 2.4.3 with Hadoop 3.2 on an AWS EC2 instance. I’ve been using spark (mainly pyspark) in local mode with great success. It is nice to be able to spin up something small and then resize it when I need power, and do it all very quickly. When I really need to scale I can switch to EMR and go to lunch. It all works smoothly apart from one issue: I can’t get the local spark to reliably write to S3 (I've been using local EBS space). This is clearly something to do with all the issues outlined in the docs about S3’s limitations as a file system. However, using the latest hadoop, my reading of the docs is that should be able to get it working.

请注意,我知道这是另一个帖子,它提出了一个相关的问题;这里有一些指导,但是我看不到任何解决方案. 如何使用新使用Spark将Hadoop拼花魔术提交到自定义S3服务器

Note that I'm aware of this other post, which asks a related question; there is some guidance here, but no solution that I can see. How to use new Hadoop parquet magic commiter to custom S3 server with Spark

根据对文档的最佳理解,我进行了以下设置(在各个位置进行了设置):

I have the following settings (set in various places), following my best understanding of the documentation here: https://hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html

fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem  
fs.s3a.committer.name: directory   
fs.s3a.committer.magic.enabled: false  
fs.s3a.committer.threads: 8 
fs.s3a.committer.staging.tmp.path: /cache/staging  
fs.s3a.committer.staging.unique-filenames: true  
fs.s3a.committer.staging.conflict-mode: fail  
fs.s3a.committer.staging.abort.pending.uploads: true  
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory  
fs.s3a.connection.maximum: 200  
fs.s3a.fast.upload: true  

一个相关的一点是我要使用实木复合地板来保存.我发现以前的Parquet保存存在一些问题,但我没有在最新文档中看到此问题.也许这是问题所在吗?

A relevant point is that I’m saving using parquet. I see that there was some problem with the Parquet saving previously, but I don’t see this mentioned in the latest docs. Maybe this is the problem?

无论如何,这是我遇到的错误,它似乎表明S3在尝试重命名临时文件夹时给出的错误种类.是否有一些正确的设置可以消除这种情况?

In any case, here is the error I’m getting, which seems indicative of the kind of error S3 gives when trying to rename the temporary folder. Is there some array of correct settings that will make this go away?

java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more

推荐答案

我帮助@brettc进行了配置,我们找到了正确的设置.

I helped @brettc with his configuration and we found out the correct one to set.

$ SPARK_HOME/conf/spark-defaults.conf

# Enable S3 file system to be recognise
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

# Parameters to use new commiters
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class     org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

如果查看上面的最后2行配置,则会看到需要org.apache.spark.internal.io库,其中包含 PathOutputCommitProtocol BindingParquetOutputCommitter 类.为此,您必须下载 spark-hadoop-cloud 罐子

If you look at the last 2 configurations lines above you see that you need org.apache.spark.internal.io library which contains PathOutputCommitProtocol and BindingParquetOutputCommitter classes. To do so you have to download spark-hadoop-cloud jar here (in our case we took version 2.3.2.3.1.0.6-1) and place it under $SPARK_HOME/jars/.

您可以通过创建一个Parquet文件来轻松地验证您是否正在使用新提交者. _SUCCESS文件应包含类似于以下内容的json:

You can easily verify that you are using the new committer by creating a parquet file. The _SUCCESS file should contains a json like the one below:

{
  "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
  "timestamp" : 1574729145842,
  "date" : "Tue Nov 26 00:45:45 UTC 2019",
  "hostname" : "<hostname>",
  "committer" : "directory",
  "description" : "Task committer attempt_20191125234709_0000_m_000000_0",
  "metrics" : { [...] },
  "diagnostics" : { [...] },
  "filenames" : [...]
}

这篇关于如何在AWS上获取本地Spark以写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆