使用Spark的S3a协议访问S3使用Hadoop版本2.7.2 [英] Accessing S3 using S3a protocol from Spark Using Hadoop version 2.7.2

查看:183
本文介绍了使用Spark的S3a协议访问S3使用Hadoop版本2.7.2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图从pyspark(版本2.2.0)访问s3(s3a协议),并且遇到了一些困难。

我正在使用Hadoop和AWS sdk包。

  pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34, org.apache.hadoop:hadoop-aws:2.7.2 

这是我的代码的样子:

  sc._jsc.hadoopConfiguration()。set(fs.s3a.impl,org.apache.hadoop.fs .s3a.S3AFileSystem)
sc._jsc.hadoopConfiguration().set(fs.s3a.access.key,AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration()。set(fs.s3a .secret.key,AWS_SECRET_ACCESS_KEY)

rdd = sc.textFile('s3a://spark-test-project/large-file.csv')
print(rdd.first( ).show())

我得到这个:

  Traceback(最近一次调用的最后一个):
在< module>中,第1行的文件< stdin>
文件/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py,第1361行,在第一个
rs = self.take(1)
文件/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py,第1313行,取
totalParts = self.getNumPartitions()
在getNumPartitions
文件/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py,第385行,返回self._jrdd.partitions()。size()
文件/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py,第1133行,在__call__
文件中/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/sql/utils.py,第63行,在deco
中返回f(* a,** kw)
文件/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/protocol .py,第319行,在get_return_value
中py4j.protocol.Py4JJavaError:调用o34.partitions时发生错误。
:com.amazonaws.services.s3.model.AmazonS3Exception:状态码:400,AWS服务:Amazon S3,AWS请求ID:32750D3DED4067BD,AWS错误代码:null,AWS错误消息:错误请求,S3扩展请求ID:jAhO0tWTblPEUehF1Bul9WZj / 9G7woaHFVxb8gzsOpekam82V / Rm9zLgdLDNsGZ6mPizGZmo6xI = $ b。在com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
。在com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
$ b at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com .amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop .fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access $ 200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem $ Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem $ Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org。 apache.hadoop.fs.Path.getFileSystem(Path.java:295)
位于org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
位于org.apache.hadoop。 mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
位于org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
位于org.apache.spark.rdd.HadoopRDD。 getPartitions(HadoopRDD.scala:194)
在org.apache.spark.rdd.RDD $$ anonfun $ partitions $ 2.apply(RDD.scala:252)
在org.apache.spark.rdd。 RDD $$ anonfun $ partitions $ 2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions( RDD.scala:25 0)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD $$ anonfun $ partitions $ 2.apply(RDD。 Scala:252)
at org.apache.spark.rdd.RDD $$ anonfun $ partitions $ 2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.api.java.JavaRDDLike $ class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect。 NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(Refl $ p $ b at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands .CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

这是AWS Java SDK的错误吗?我是新来的火花,所以我不知道是否有一种方法可以从AWS获得更好的日志信息,而不是 AWS错误代码:null

解决方案

错误的请求是来自S3的恐惧信息,意思是这不起作用,我们不会告诉你为什么。



文档



如果您的存储区是托管的人只支持S3v4认证协议(frankfurt,london,seoul),那么您需要将fs.s3a.endpoint字段设置为特定区域的字段......该文档具有详细信息。

否则,请尝试使用 s3a://landsat-pds/scene_list.gz 作为来源。这是一个不需要身份验证的公共CSV文件。如果你看不到它,那么你遇到了严重的麻烦

I’m trying to access s3 (s3a protocol) from pyspark (version 2.2.0) and I’m having some difficulty.

I’m using the Hadoop and AWS sdk packages.

pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2

Here is what my code looks like:

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

rdd = sc.textFile('s3a://spark-test-project/large-file.csv')
print(rdd.first().show())

I get this:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1361, in first
    rs = self.take(1)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1313, in take
    totalParts = self.getNumPartitions()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.partitions.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 32750D3DED4067BD, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: jAhO0tWTblPEUehF1Bul9WZj/9G7woaHFVxb8gzsOpekam82V/Rm9zLgdLDNsGZ6mPizGZmo6xI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Is this a bug with the AWS Java SDK? I’m new to spark, so I don’t know if there a way to get better logging information from AWS other than AWS Error Code: null

解决方案

"Bad request" is the message to fear from S3, it means "This didn't work and we won't tell you why".

There's a whole section on troubleshooting S3A in the docs.

If your bucket is hosted someone which only supports the S3 "v4" auth protocol (frankfurt, london, seoul) then you need to set the fs.s3a.endpoint field to that of the specific region ... the doc has details.

Otherwise, try using s3a://landsat-pds/scene_list.gz as a source. It's a public CSV File which doesn't need authentication. If you can't see it, then you are in serious trouble

这篇关于使用Spark的S3a协议访问S3使用Hadoop版本2.7.2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆