Spark:将大型MySQL表读入DataFrame失败 [英] Spark: Reading big MySQL table into DataFrame fails

查看:304
本文介绍了Spark:将大型MySQL表读入DataFrame失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想提前告诉您几个相关的问题,例如以下内容,不要解决我的问题:

I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:

  • Spark query running very slow
  • Converting mysql table to dataset is very slow...
  • Spark Will Not Load Large MySql Table
  • Spark MySQL Error while Reading from Database

接近了,但是堆栈跟踪是不同的,而且还是无法解决.因此,请放心,在(失败)解决方案寻找几天后,我会发布此问题.

This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.

我正在尝试编写一项作业,将数据(一天一次)从MySQL表移动到以Parquet/ORC文件存储在Amazon S3上的Hive表中.其中一些表相当大:〜300M条记录 200 GB +大小(由phpMyAdmin报告).

I'm trying to write a job that moves data (once a day) from MySQL tables to Hive tables stored as Parquet / ORC files on Amazon S3. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin).

当前,我们为此使用sqoop,但由于以下原因,我们希望移至Spark:

Currently we are using sqoop for this but we want to move to Spark for the following reasons:

  • 要利用DataFrame API发挥其功能(将来,我们将在移动数据时执行转换)
  • 我们已经有一个用Scala编写的 sizeable 框架,用于组织中其他地方使用的Spark作业
  • To leverage it's capabilities with DataFrame API (in future, we would be performing transformations while moving data)
  • We already have a sizeable framework written in Scala for Spark jobs used elsewhere in the organization

我已经能够在 small MySQL表上实现此目标,而没有任何问题.但是,如果我一次尝试获取超过〜1.5-2M条记录,则Spark作业(将数据从MySQL读取到DataFrame)将失败.我在下面显示了堆栈跟踪的相关部分,您可以找到完整的堆栈跟踪

I've been able to achieve this on small MySQL tables without any issue. But the Spark job (that reads data from MySQL into DataFrame) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.

...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
    at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:>                                                       (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...

** 此堆栈跟踪是在移动包含 186M 个记录

** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records

从(完整)堆栈跟踪中可以明显看出,Spark读取作业开始生闷气并出现错误警告出现None.get错误,然后出现SQLException: Incorrect key for file..(这与MySQL tmp表变满)

As apparent from (full) stack-trace, the Spark read job starts sulking with the false warnings of None.get error followed by SQLException: Incorrect key for file.. (which is related to MySQL's tmp table becoming full)

现在显然这不是MySQL问题,因为在这种情况下sqoop也应该失败.就Spark而言,我是

Now clearly this can't be a MySQL problem because in that case sqoop should fail as well. As far as Spark is concerned, I'm parallelizing the read operation by setting numPartitions = 32 (we use parallelism of 40 with sqoop).

从我对SparkBigData有限知识来说, 148 GB 绝对不会使Spark变得不堪重负.此外,由于MySQLSpark(EMR)和S3都位于相同的区域(AWS AP-SouthEast),因此延迟不应该'是瓶颈.

From my limited knowledge of Spark and BigData, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL, Spark (EMR) and S3 all reside in same region (AWS AP-SouthEast), so latency shouldn't be the bottleneck.

我的问题是:

  1. Spark是否适合此工具?
  2. 此问题可归咎于SparkJdbc 驱动程序吗?
  3. 如果上述问题的答案是
    • 是的:我该如何克服? (备用驱动程序或其他解决方法)?
    • 否:可能是什么原因?
  1. Is Spark a suitable tool for this?
  2. Could Spark's Jdbc driver be blamed for this issue?
  3. If answer to above question is
    • Yes: How can I overcome it? (alternate driver, or some other workaround)?
    • No: What could be the possible cause?


框架配置:


Framework Configurations:

  • Hadoop发行版: Amazon 2.8.3
  • Spark 2.2.1
  • Hive 2.3.2
  • Scala 2.11.11
  • Hadoop distribution: Amazon 2.8.3
  • Spark 2.2.1
  • Hive 2.3.2
  • Scala 2.11.11

EMR配置:

  • EMR 5.12.0
  • 1 Master: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储EBS存储:32 GiB]
  • 1 Task: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储EBS存储:无]
  • 1 Core: r3.xlarge [8 vCore,30.5 GiB内存,80 SSD GB存储 EBS存储空间:32 GiB]
  • EMR 5.12.0
  • 1 Master: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]
  • 1 Task: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]
  • 1 Core: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]

** 这些是开发集群的配置;生产集群将更好地装备

推荐答案

Spark JDBC API似乎可以分叉,而无需将MySQL表中的所有数据加载到内存中.因此,当您尝试加载大表时,您应该做的是首先将Spark API克隆数据用于HDFS(应使用JSON来保持架构结构),如下所示:

Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:

spark.read.jdbc(jdbcUrl, tableName, prop)
       .write()
       .json("/fileName.json");

然后,您可以正常处理HDFS.

Then you can working on HDFS instead normally.

spark.read().json("/fileName.json")
       .createOrReplaceTempView(tableName);

这篇关于Spark:将大型MySQL表读入DataFrame失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆