Amazon s3a使用Spark-redshift库返回400错误请求 [英] Amazon s3a returns 400 Bad Request with Spark-redshift library

查看:111
本文介绍了Amazon s3a使用Spark-redshift库返回400错误请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在通过 spark-redshift库加载Redshift数据时,我正面临java.io.IOException: s3n://bucket-name : 400 : Bad Request error:

I am facing java.io.IOException: s3n://bucket-name : 400 : Bad Request error while loading Redshift data through spark-redshift library:

Redshift集群和s3存储桶都位于孟买地区.

The Redshift cluster and the s3 bucket both are in mumbai region.

这是完整的错误堆栈:

2017-01-13 13:14:22 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
            at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:92)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:80)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
            at org.apache.spark.scheduler.Task.run(Task.scala:86)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
            ... 30 more

这是我的相同的Java代码:

And here is my java code for the same:

SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");

SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
        .read()
        .format("com.databricks.spark.redshift")
        .option("url", redshiftUrl)
        .option("query", query)
        .option("aws_iam_role", aws_iam_role)
        .option("tempdir", "s3a://bucket-name/temp-dir")
        .load();

通过进行以下更改,我能够以火花本地模式解决问题(称为

I was able to solve the problem on spark local mode by doing following changes (referred this):

1)我已将jets3t jar替换为0.9.4

1) I have replaced the jets3t jar to 0.9.4

2)更改了jets3t配置属性以支持aws4版本存储桶,如下所示:

2) Changed jets3t configuration properties to support the aws4 version bucket as follows:

Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");

但是现在我正尝试以群集模式(火花独立模式使用资源管理器MESOS )运行作业,并且错误再次出现:(

But now i am trying to run the job in a clustered mode (spark standalone mode or with a resource manager MESOS) and the error appears again :(

任何帮助将不胜感激!

推荐答案

实际问题:

更新Jets3tProperties以支持AWS s3签名版本4,在运行时在本地模式下工作,但在集群模式下不工作,因为属性仅在驱动程序JVM上更新,而在任何执行者JVM上都没有更新.

Updating Jets3tProperties, to support AWS s3 signature version 4, at runtime worked on local mode but not on cluster mode because the properties were only getting updated on the driver JVM but not on any of the executor JVM's.

解决方案:

我找到了一种解决方法,可以通过参考来更新所有执行程序上的Jets3tProperties. a>链接.

I found a workaround to update the Jets3tProperties on all executors by referring to this link.

通过引用上面的链接,我在.foreachPartition()函数中放置了一个附加代码段来更新Jets3tProperties,它将对在任何执行程序上创建的第一个分区运行它.

By referring to the above link I have put an additional code snippet, to update the Jets3tProperties, inside .foreachPartition() function which will run it for the first partition created on any of the executors.

这是代码:

 Dataset dataset= sqlContext
            .read()
            .format("com.databricks.spark.redshift")
            .option("url", redshiftUrl)
            .option("query", query)
            .option("aws_iam_role", aws_iam_role)
            .option("tempdir", "s3a://bucket-name/temp-dir")
            .load();

dataset.foreachPartition(rdd -> {
    boolean first=true;
    if(first){
        Jets3tProperties myProperties =
                Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
        myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
        myProperties
                .setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
        myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
        first = false;
    }
});

这篇关于Amazon s3a使用Spark-redshift库返回400错误请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆