尝试使用Pyspark访问AWS S3时出错 [英] Error trying to access AWS S3 using Pyspark

查看:165
本文介绍了尝试使用Pyspark访问AWS S3时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark从AWS S3访问gzip文件.我下面有一个非常简单的脚本.我首先开始 拥有对S3存储桶具有访问权限的IAM用户.然后,我创建了一个EC2实例&已安装的Python&火花. 我按如下所示设置spark.properties文件.我只复制了jar文件,没有费心进行整个Hadoop安装. 然后我意识到我必须为EC2实例创建一个IAM角色才能访问S3. 因此,我创建了一个IAM角色,附加了访问策略,然后将该角色附加到EC2.没有重启EC2实例. 我究竟做错了什么?我的目标是在进入EMR,群集等之前,先在独立的环境中熟悉Pyspark.

我将Pyspark执行为: enter code here spark-submit --properties-file spark.properties S3Access.py

我的Pyspark代码:

import os.path
from pathlib import Path
from pyspark import SparkContext, SparkConf
from boto3.session import Session

ACCESS_KEY = 'blah blah'
SECRET_KEY = 'blah blah'
BUCKET_NAME = 'bucket'
PREFIX = 'folder-name/'
MAX_FILES_READ = 3

if __name__ == "__main__":
        # Use Boto to connect to S3 and get a list of objects from a bucket
        session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

        s3 = session.resource('s3')

        # call S3 to list current buckets
        my_bucket = s3.Bucket(BUCKET_NAME)

        # Get a Spark context and use it to parallelize the keys
        conf = SparkConf().setAppName("MyFirstProcessingApp")
        sc = SparkContext(conf=conf)

        index = 0
        for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
                if 'gz' in s3_file.key:
                        index += 1
                        print ("Found file: ", s3_file.key)
                        if index == MAX_FILES_READ:
                                break
                        fileLocation = "s3a://" + BUCKET_NAME + '/path-to-file/path/filename.txt'
                        print ("file location: ", fileLocation)
                        s3File = sc.textFile(fileLocation)
                        count = s3File.count()

我得到的错误:

ubuntu@ip-172-31-57-35:/opt/iqmedia$ spark-submit --properties-file spark.properties S3Access.py
19/07/22 01:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found file:  inscape/content/2019-01-01/2019-01-01-07.0000_part_00.gz
file location:  s3a://bucket/folder/filename.txt
Traceback (most recent call last):
  File "/opt/iqmedia/S3Access.py", line 42, in <module>
    count = s3File.count()
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 35CB499B1AE1A8A6, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: BHnH2DO+HuvARy9d3hdfCrtc2ToSJ7DQ/6ODSymLfDOZF7G80rpJqyyvkVuXdAPsR2a9gjqxWX8=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
        at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

我做错了什么? 任何帮助表示赞赏.谢谢

7月22日更新: 除了运行笔记本电脑外,我所做的一切都与上述步骤相同.我再次运行了脚本,并收到以下错误.有什么想法吗?

File "/opt/iqmedia/S3Access.py", line 39, in <module>
    print(s3File.count())
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

7月22日晚上更新: 好吧,我过去了.我认为我使用了错误的Hadoop jar文件.我的bashrc现在看起来像这样,即使我没有Notebook工作.

export JAVA_HOME=/usr
export SPARK_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export HADOOP_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:/home/ubuntu/anaconda3/bin:$HADOOP_HOME/bin
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYSPARK_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

现在,我能够运行一个简单的Spark命令来测量文件的大小.但是我仍然收到下面的错误.

NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

有什么想法吗?

解决方案

我建议您遵循我在下面提到的这条路线,因为过去我曾经遇到过s3和pyspark的问题,无论我遇到什么这样做对我的头部或墙壁都没有好处.

  1. 在本地下载spark(为hadoop 2.7预先构建的2.4.x版)
  2. 设置环境变量(例如SPARK_HOME),并将其添加到PATH.
    • 在MacOS上,我将其添加到~/.bash_profile.与其他操作系统应该相似.
  3. 下载 aws-java-sdk-1.7.4.jar 并将其保存在某个文件夹中(例如/users/me/test-spark).这里要记住的一件事是,如果您使用任何其他版本,都会感到痛苦.
  4. 在您的spark-defaults.conf文件中添加这两行,您可以在spark安装路径中找到该行:

spark.driver.extraClassPath:/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar

spark.executor.extraClassPath:/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar

  1. 下载Anaconda,jupyter随附在其中,因此不会头痛.配置您的jupyter绑定到特定的python,在您的本地计算机上触发.
    • 在MacOS上,我在~/.bash_profile(PYTHONPATHPYSPARK_PYTHONPYSPARK_DRIVER_PYTHONPYSPARK_DRIVER_PYTHON_OPTS)中添加了具有适当值的这些变量.您可以在线上找到说明如何设置这些值的教程.


完成所有这些先决条件后,就可以进入下一阶段:

  1. 将以下代码段粘贴到笔记本中并运行.我认为您的代码在文件名经过硬编码时存在一些问题.因此,它们指向的是S3中一些不存在的对象.

 import os.path
from pathlib import Path
from pyspark.sql import SparkSession
from boto3.session import Session

ACCESS_KEY = 'blah blah blah?'
SECRET_KEY = 'blah blah blah!'
BUCKET_NAME = 'my-leaky-bucket'
PREFIX = 'root'
MAX_FILES_READ = 3

# Use Boto to connect to S3 and get a list of objects from a bucket
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

s3 = session.resource('s3')

# call S3 to list current buckets
my_bucket = s3.Bucket(BUCKET_NAME)

spark = SparkSession.builder.appName('MyFirstProcessingApp').master('local[2]').getOrCreate()

sc = spark.sparkContext
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

index = 0
for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
    if 'gz' in s3_file.key:
        index += 1
        print ("Found file: {file}".format(file=s3_file.key))
        if index == MAX_FILES_READ:
            break
        fileLocation = "s3a://{bucket}/{file}".format(bucket=BUCKET_NAME,file=s3_file.key)
        print ("file location: {loc}".format(loc=fileLocation))
        s3File = sc.textFile(fileLocation)
        print(s3File.count())
        print('\n')
 

附加说明:令人惊讶的是,如果您创建了一个EMR集群并将一个jupyter笔记本(从AWS Web-UI)附加到它,它将处理所有事情.您只需将代码段复制粘贴到该Jupyter笔记本中,就可以了.

I am trying to access gzip files from AWS S3 using Spark. I have a very simple script below. I first started off with a IAM user with access permissions to the S3 bucket. Then I created an EC2 instance & installed Python & Spark. I setup the spark.properties file as below. I only copied the jar files, didn't bother to go through the entire Hadoop installation. Then I realized I have to create an IAM role for EC2 instances to access S3. So, I created an IAM role, attached an access policy and then attached the role to EC2. Did not restart EC2 instance. What am I doing wrong? My goal is to get comfortable with Pyspark on a standalone environment before I proceed to EMR, clusters, etc.

I execute Pyspark as: enter code herespark-submit --properties-file spark.properties S3Access.py

My Pyspark code:

import os.path
from pathlib import Path
from pyspark import SparkContext, SparkConf
from boto3.session import Session

ACCESS_KEY = 'blah blah'
SECRET_KEY = 'blah blah'
BUCKET_NAME = 'bucket'
PREFIX = 'folder-name/'
MAX_FILES_READ = 3

if __name__ == "__main__":
        # Use Boto to connect to S3 and get a list of objects from a bucket
        session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

        s3 = session.resource('s3')

        # call S3 to list current buckets
        my_bucket = s3.Bucket(BUCKET_NAME)

        # Get a Spark context and use it to parallelize the keys
        conf = SparkConf().setAppName("MyFirstProcessingApp")
        sc = SparkContext(conf=conf)

        index = 0
        for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
                if 'gz' in s3_file.key:
                        index += 1
                        print ("Found file: ", s3_file.key)
                        if index == MAX_FILES_READ:
                                break
                        fileLocation = "s3a://" + BUCKET_NAME + '/path-to-file/path/filename.txt'
                        print ("file location: ", fileLocation)
                        s3File = sc.textFile(fileLocation)
                        count = s3File.count()

Error I get:

ubuntu@ip-172-31-57-35:/opt/iqmedia$ spark-submit --properties-file spark.properties S3Access.py
19/07/22 01:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found file:  inscape/content/2019-01-01/2019-01-01-07.0000_part_00.gz
file location:  s3a://bucket/folder/filename.txt
Traceback (most recent call last):
  File "/opt/iqmedia/S3Access.py", line 42, in <module>
    count = s3File.count()
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 35CB499B1AE1A8A6, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: BHnH2DO+HuvARy9d3hdfCrtc2ToSJ7DQ/6ODSymLfDOZF7G80rpJqyyvkVuXdAPsR2a9gjqxWX8=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
        at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

What am I doing wrong? Any help is appreciated. Thanks

Updated July 22: I did everything like mentioned except running Notebook. I ran the script again and got the following error. Any thoughts?

File "/opt/iqmedia/S3Access.py", line 39, in <module>
    print(s3File.count())
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Updated July 22 evening: Well, I got past everything. I think I was using the wrong Hadoop jar file. My bashrc looks like this now even though I don't have Notebook working.

export JAVA_HOME=/usr
export SPARK_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export HADOOP_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:/home/ubuntu/anaconda3/bin:$HADOOP_HOME/bin
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYSPARK_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

Now, I am able to run a simple Spark command to measure the size of a file. But I am still getting the error below.

NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Any thoughts?

解决方案

I'd suggest you go via this route that I'm mentioning below, because I've faced issues with s3 and pyspark in the past, and whatever I did wasn't good for my head, or for the wall.

  1. Download spark on your local (version 2.4.x prebuilt for hadoop 2.7)
  2. Set your env variables (e.g. SPARK_HOME), and add it to PATH.
    • On MacOS I add it to ~/.bash_profile. Should be similar for other OSs.
  3. Download hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar and save these inside some folder (e.g. /users/me/test-spark). One thing to remember here is that if you use any other version, you'll be in agony.
  4. Add these two lines in your spark-defaults.conf file which you can find inside the spark installation path:

spark.driver.extraClassPath :/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar

spark.executor.extraClassPath :/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar

  1. Download Anaconda, jupyter comes bundled with it, so no headache. Configure your jupyter to bind to a particular python, spark on your local.
    • On MacOS I add these variables with proper values in ~/.bash_profile (PYTHONPATH, PYSPARK_PYTHON, PYSPARK_DRIVER_PYTHON, PYSPARK_DRIVER_PYTHON_OPTS). You can find tutorials online that show how to set these values.


Once you've done all these prerequisites you can move to next stage:

  1. Paste the following code-snippet in your notebook and run. I think your code had some issues where the file names are hard-coded. So they are pointing to some non-existent objects in S3.

import os.path
from pathlib import Path
from pyspark.sql import SparkSession
from boto3.session import Session

ACCESS_KEY = 'blah blah blah?'
SECRET_KEY = 'blah blah blah!'
BUCKET_NAME = 'my-leaky-bucket'
PREFIX = 'root'
MAX_FILES_READ = 3

# Use Boto to connect to S3 and get a list of objects from a bucket
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

s3 = session.resource('s3')

# call S3 to list current buckets
my_bucket = s3.Bucket(BUCKET_NAME)

spark = SparkSession.builder.appName('MyFirstProcessingApp').master('local[2]').getOrCreate()

sc = spark.sparkContext
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

index = 0
for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
    if 'gz' in s3_file.key:
        index += 1
        print ("Found file: {file}".format(file=s3_file.key))
        if index == MAX_FILES_READ:
            break
        fileLocation = "s3a://{bucket}/{file}".format(bucket=BUCKET_NAME,file=s3_file.key)
        print ("file location: {loc}".format(loc=fileLocation))
        s3File = sc.textFile(fileLocation)
        print(s3File.count())
        print('\n')

Additional Note: This is kinda amazing that if you create an EMR Cluster and attach a jupyter notebook to it (from AWS Web-UI), it takes care of everything. You can simply copy-paste the code snippet in that Jupyter notebook, and you're good to go.

这篇关于尝试使用Pyspark访问AWS S3时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆