Spark:将大型MySQL表读入DataFrame失败 [英] Spark: Reading big MySQL table into DataFrame fails
问题描述
我想提前告诉您几个相关的问题,例如以下内容,不要解决我的问题:
I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:
- Spark query running very slow
- Converting mysql table to dataset is very slow...
- Spark Will Not Load Large MySql Table
- Spark MySQL Error while Reading from Database
这接近了,但是堆栈跟踪是不同的,而且还是无法解决.因此,请放心,在(失败)解决方案寻找几天后,我会发布此问题.
This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.
我正在尝试编写一项作业,将数据(一天一次)从MySQL
表移动到以Parquet
/ORC
文件存储在Amazon S3
上的Hive
表中.其中一些表相当大:〜300M条记录, 200 GB +大小(由phpMyAdmin
报告).
I'm trying to write a job that moves data (once a day) from MySQL
tables to Hive
tables stored as Parquet
/ ORC
files on Amazon S3
. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin
).
当前,我们为此使用sqoop
,但由于以下原因,我们希望移至Spark
:
Currently we are using sqoop
for this but we want to move to Spark
for the following reasons:
- 要利用
DataFrame API
发挥其功能(将来,我们将在移动数据时执行转换) - 我们已经有一个用
Scala
编写的 sizeable 框架,用于组织中其他地方使用的Spark
作业
- To leverage it's capabilities with
DataFrame API
(in future, we would be performing transformations while moving data) - We already have a sizeable framework written in
Scala
forSpark
jobs used elsewhere in the organization
我已经能够在 small MySQL
表上实现此目标,而没有任何问题.但是,如果我一次尝试获取超过〜1.5-2M条记录,则Spark
作业(将数据从MySQL
读取到DataFrame
)将失败.我在下面显示了堆栈跟踪的相关部分,您可以找到完整的堆栈跟踪
I've been able to achieve this on small MySQL
tables without any issue. But the Spark
job (that reads data from MySQL
into DataFrame
) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.
...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:> (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...
** 此堆栈跟踪是在移动包含 186M 个记录
** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records
从(完整)堆栈跟踪中可以明显看出,Spark
读取作业开始生闷气并出现错误警告出现None.get
错误,然后出现SQLException: Incorrect key for file..
(这与MySQL
的 tmp表变满)
As apparent from (full) stack-trace, the Spark
read job starts sulking with the false warnings of None.get
error followed by SQLException: Incorrect key for file..
(which is related to MySQL
's tmp table becoming full)
现在显然这不是MySQL
问题,因为在这种情况下sqoop
也应该失败.就Spark
而言,我是
Now clearly this can't be a MySQL
problem because in that case sqoop
should fail as well. As far as Spark
is concerned, I'm parallelizing the read operation by setting numPartitions = 32
(we use parallelism of 40 with sqoop
).
从我对Spark
和BigData
的有限知识来说, 148 GB 绝对不会使Spark变得不堪重负.此外,由于MySQL
,Spark
(EMR
)和S3
都位于相同的区域(AWS
AP-SouthEast
),因此延迟不应该'是瓶颈.
From my limited knowledge of Spark
and BigData
, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL
, Spark
(EMR
) and S3
all reside in same region (AWS
AP-SouthEast
), so latency shouldn't be the bottleneck.
我的问题是:
-
Spark
是否适合此工具? - 此问题可归咎于
Spark
的Jdbc
驱动程序吗? - 如果上述问题的答案是
- 是的:我该如何克服? (备用驱动程序或其他解决方法)?
- 否:可能是什么原因?
- Is
Spark
a suitable tool for this? - Could
Spark
'sJdbc
driver be blamed for this issue? - If answer to above question is
- Yes: How can I overcome it? (alternate driver, or some other workaround)?
- No: What could be the possible cause?
框架配置:
Framework Configurations:
-
Hadoop
发行版: Amazon 2.8.3 -
Spark
2.2.1 -
Hive
2.3.2 -
Scala
2.11.11
Hadoop
distribution: Amazon 2.8.3Spark
2.2.1Hive
2.3.2Scala
2.11.11
EMR
配置:
-
EMR
5.12.0 -
1 Master
: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储EBS存储:32 GiB] -
1 Task
: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储EBS存储:无] -
1 Core
: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储 EBS存储空间:32 GiB]
EMR
5.12.01 Master
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]1 Task
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]1 Core
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]
** 这些是开发集群的配置;生产集群将更好地装备
推荐答案
Spark JDBC API似乎可以分叉,而无需将MySQL表中的所有数据加载到内存中.因此,当您尝试加载大表时,您应该做的是首先将Spark API克隆数据用于HDFS(应使用JSON来保持架构结构),如下所示:
Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:
spark.read.jdbc(jdbcUrl, tableName, prop)
.write()
.json("/fileName.json");
然后,您可以正常处理HDFS.
Then you can working on HDFS instead normally.
spark.read().json("/fileName.json")
.createOrReplaceTempView(tableName);
这篇关于Spark:将大型MySQL表读入DataFrame失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!