Py4JJavaError:调用o57.showString时发生错误. :org.apache.spark.SparkException: [英] Py4JJavaError: An error occurred while calling o57.showString. : org.apache.spark.SparkException:

查看:134
本文介绍了Py4JJavaError:调用o57.showString时发生错误. :org.apache.spark.SparkException:的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用pyspark连接到运行数据库25 GB的AWS实例(r5d.xlarge 4 vCPU 32 GiB),当我运行某些表时出现错误:

I am working with pyspark connected to an AWS instance (r5d.xlarge 4 vCPUs 32 GiB) running a data base 25 GB, when I run some tables I got the error:

Py4JJavaError:调用o57.showString时发生错误. :org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段0.0中的任务0失败1次,最近一次失败:阶段0.0中的任务0.0丢失(TID 0,本地主机,执行程序驱动程序):java.lang.OutOfMemoryError :超出了GC开销限制

Py4JJavaError: An error occurred while calling o57.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

我试图为自己找出错误,但不幸的是,有关此问题的信息并不多.

I tried to find out the error for myself but unfortunately the is not much information regarding this issue.

代码

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').\
     config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
     appName('test').getOrCreate()

df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        load()

  df.printSchema()


在这里我得到了printSchema,但是:


here I get the printSchema but then:

df_1 = df.select(['col1', 'col2', 'col3', 'col4', 
                  'col4', 'col5', 'col6']).show()

Py4JJavaError: An error occurred while calling o57.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task            
  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
  0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC 
  overhead limit exceeded


有人知道如何解决这个问题吗?


Anybody an idea how can I solve this problem?

推荐答案

这里是一种使多个spark workers上的串行JDBC读取并行化的方法...您可以以此为指南自定义它以适合您的源数据.基本上,主要的先决条件是拥有某种唯一的密钥来进行拆分.

Here is a method to parallelize serial JDBC reads across multiple spark workers ... you can use this as a guide to customize it to your source data ... basically the main prerequisite is to have some kind of unique key to split on.

请具体参考本文档中的参数partitionColumn, lowerBound, upperBound, numPartitions

Please refer to this documentation specifically parameters partitionColumn, lowerBound, upperBound, numPartitions

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

一些代码示例:

# find min and max for column used to split on
from pyspark.sql.functions import min, max

minDF = df.select(min("id")).first()[0] # replace 'id' with your key col
maxDF = df.select(max("id")).first()[0] # replace 'id' with your key col
numSplits = 125 # you will need to tailor this value to your dataset ... you mentioned your source as 25GB so try 25000 MB / 200 MB = 125 partitions

print("df min: {}\df max: {}".format(minDF, maxDF))

# your code => add a few more parameters
df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        option('partitionColumn', 'id').\ # col to split on
        option('lowerBound', minDF).\ # min value
        option('upperBound', maxDF).\ # max value
        option('numPartitions', numSplits).\ # num of splits (partitions) spark will distribute across executor workers
        load()

print(df.rdd.getNumPartitions())

另一个示例连接字符串=>如果您使用的是spark 2.4,请参阅此文档,它使用一些更简洁的代码

Another example connection string => if you are using spark 2.4 / refer to this doc it uses some cleaner code

https://docs.databricks .com/spark/latest/data-sources/sql-databases.html#manage-parallelism

sourceDF = spark.read.jdbc(
  url=jdbcUrl, 
  table="dbname.tablename",
  column='"id"',
  lowerBound=minDF,
  upperBound=maxDF,
  numPartitions=125,
  properties=connectionProps
)

这篇关于Py4JJavaError:调用o57.showString时发生错误. :org.apache.spark.SparkException:的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆