将 MySQL 表转换为镶木地板时出现 Spark 异常 [英] Spark Exception when converting a MySQL table to parquet

查看:25
本文介绍了将 MySQL 表转换为镶木地板时出现 Spark 异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 spark 1.6.2 将 MySQL 远程表转换为镶木地板文件.

I'm trying to convert a MySQL remote table to a parquet file using spark 1.6.2.

该进程运行 10 分钟,填满内存,然后从这些消息开始:

The process runs for 10 minutes, filling up memory, than starts with these messages:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

最后失败并出现此错误:

at the end fails with this error:

ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded

我使用以下命令在 spark-shell 中运行它:

I'm running it in a spark-shell with these commands:

spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()

dataframe_mysql.saveAsParquetFile("name.parquet")

我将最大执行程序内存限制为 12G.有没有办法强制将镶木地板文件写入小"块以释放内存?

I have limits to the max executor memory to 12G. Is there a way to force writing the parquet file in "small" chunks freeing memory?

推荐答案

问题似乎是在使用 jdbc 连接器读取数据时没有定义分区.

It seemed like the problem was that you had no partition defined when you read your data with the jdbc connector.

默认情况下不会分发从 JDBC 读取的内容,因此要启用分发功能,您必须设置手动分区.您需要一个作为良好分区键的列,并且您必须预先了解分布情况.

Reading from JDBC isn't distributed by default, so to enable distribution you have to set manual partitioning. You need a column which is a good partitioning key and you have to know distribution up front.

这显然是您的数据的样子:

This is what your data looks like apparently :

root 
|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true)

order_year 对我来说似乎是一个不错的候选人.(根据您的评论,您似乎有大约 20 年的时间)

order_year seemed like a good candidate to me. (you seem to have ~20 years according to your comments)

import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = ???

val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???

// Manual partitioning
val partitionColumn: String = "order_year"

val options: Map[String, String] = Map("driver" -> driver,
  "url" -> connectionUrl,
  "dbtable" -> query,
  "user" -> userName,
  "password" -> password,
  "partitionColumn" -> partitionColumn,
  "lowerBound" -> "0",
  "upperBound" -> "3000",
  "numPartitions" -> "300"
)

val df = sqlContext.read.format("jdbc").options(options).load()

PS: partitionColumnlowerBoundupperBoundnumPartitions:如果指定了其中任何一个,则必须全部指定这些选项.

PS: partitionColumn, lowerBound, upperBound, numPartitions: These options must all be specified if any of them is specified.

现在您可以将 DataFrame 保存到 parquet.

Now you can save your DataFrame to parquet.

这篇关于将 MySQL 表转换为镶木地板时出现 Spark 异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆