使用Pyspark从S3读取时,Content-Length分隔的消息正文SparkException的提前结束 [英] Premature end of Content-Length delimited message body SparkException while reading from S3 using Pyspark

查看:105
本文介绍了使用Pyspark从S3读取时,Content-Length分隔的消息正文SparkException的提前结束的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用以下代码从我的本地计算机中读取 S3 csv文件.

I am using the below code to read S3 csv file from my local machine.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import configparser
import os

conf = SparkConf()
conf.set('spark.jars', '/usr/local/spark/jars/aws-java-sdk-1.7.4.jar,/usr/local/spark/jars/hadoop-aws-2.7.4.jar')

#Tried by setting this, but failed
conf.set('spark.executor.memory', '8g') 
conf.set('spark.driver.memory', '8g') 

spark_session = SparkSession.builder \
        .config(conf=conf) \
        .appName('s3-write') \
        .getOrCreate()

# getting S3 credentials from file
aws_profile = "lijo" #user profile name
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_key = config.get(aws_profile, "aws_access_key_id") 
secret_key = config.get(aws_profile, "aws_secret_access_key")

# hadoop configuration for S3
hadoop_conf=spark_session._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", access_key)
hadoop_conf.set("fs.s3a.secret.key", secret_key)
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

#Tried by setting this, but no use
hadoop_conf.set("fs.s3a.connection.maximum", "1000") 
hadoop_conf.set("fs.s3.maxConnections", "1000") 
hadoop_conf.set("fs.s3a.connection.establish.timeout", "50000") 
hadoop_conf.set("fs.s3a.socket.recv.buffer", "8192000") 
hadoop_conf.set("fs.s3a.readahead.range", "32M")

# 1) Read csv
df = spark_session.read.csv("s3a://pyspark-lijo-test/auction.csv", header=True,mode="DROPMALFORMED")
df.show(2)

以下是我的Spark独立配置详细信息.

Below is my spark standalone configuration details.

[('spark.driver.host', '192.168.0.49'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 's3-write'),
 ('spark.repl.local.jars',
  'file:///usr/local/spark/jars/aws-java-sdk-1.7.4.jar,file:///usr/local/spark/jars/hadoop-aws-2.7.4.jar'),
 ('spark.jars',
  '/usr/local/spark/jars/aws-java-sdk-1.7.4.jar,/usr/local/spark/jars/hadoop-aws-2.7.4.jar'),
 ('spark.app.id', 'local-1594186616260'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.driver.port', '35497'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.memory', '8g'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

但是我什至在读取 1MB 文件时遇到以下错误.

But I am getting the below error while reading even a 1MB file.

Py4JJavaError: An error occurred while calling o43.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, 192.168.0.49, executor driver): org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 888,879; received: 16,360)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
    at org.apache.spark.scheduler.Task.run(Task.scala:143)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

尝试将 S3读取代码更改为低于1的代码,并且可以正常工作,但是我们需要将RDD转换为数据帧.

Tried changing the S3 read code to below one and it is working, but we need to convert RDD to Dataframe.

2) data = spark_session.sparkContext.textFile("s3a://pyspark-lijo-test/auction.csv").map(lambda line: line.split(","))
data.show(2)

为什么SparkSql代码(1)甚至无法读取小文件或需要进行任何设置?

Why is the SparkSql code(1) not able to read even small size file or any setting needs to be done?

推荐答案

找出了问题所在. Spark 3.0 中存在一些问题.切换到最新的 Spark 2.4.6 版本,并且按预期运行正常.

Found out the issue. There was some issue in Spark 3.0. Switched to latest Spark 2.4.6 version and it is working fine as expected.

这篇关于使用Pyspark从S3读取时,Content-Length分隔的消息正文SparkException的提前结束的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆