Spark Scala使用s3a读取csv文件 [英] Spark Scala read csv file using s3a

查看:82
本文介绍了Spark Scala使用s3a读取csv文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用本地运行的Spark-Scala从S3存储桶中读取csv(本机)文件.我可以使用http协议读取文件,但我打算使用s3a协议.

I am trying to read a csv (native) file from an S3 bucket using a locally running Spark - Scala. I am able to read the file using the http protocol but I intend to use the s3a protocol.

以下是呼叫之前的配置设置.

Below is the configuration setup before the call.

    val awsId = System.getenv("AWS_ACCESS_KEY_ID")
    val awsKey = System.getenv("AWS_SECRET_ACCESS_KEY")
    sc.hadoopConfiguration.set("fs.s3a.access.key", awsId) 
    sc.hadoopConfiguration.set("fs.s3a.secret.key", awsKey)    
    sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider");
    sc.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "us-east-1.amazonaws.com")
    sc.hadoopConfiguration.set("fs.s3a.impl.disable.cache", "true")
 here

读取文件并从rdd/dataframe打印前5行

Read the file and print the first 5 rows from the rdd/dataframe

    val fileAPath = Files.s3aPath(Files.input);
    println("reading file s3", fileAPath)
    // s3a://bucket-name/dataSets/policyoutput.csv
    val df = sc.textFile(fileAPath);
    df.take(5).foreach(println);

我收到以下异常

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: FD92FDC175C64AA2, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: IuloUEASgqnY4lrSMpbyJpwgFfCFbttxuxmJ9hGHMUgZTbO/UR/YyDgjix+3rBe0Y4MQHPzNvhA=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

任何帮助/指示进一步调查将不胜感激.

Any help / direction for further investigation will be much appreciated.

谢谢

推荐答案

其他为此苦苦挣扎的人,我不得不更新hadoop-client的版本

Anyone else struggling with this I had to update the version of hadoop-client

此外,下面的链接很有帮助

additionally the links below were quite helpful

https://disqus.com/by/cfeduke/?utm_source = reply& utm_medium = email& utm_content = comment_author

下面的pom详细信息

<properties>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.8.0</hadoop.version>

</properties>


<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

这篇关于Spark Scala使用s3a读取csv文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆