将MySQL表转换为镶木地板时出现火花异常 [英] Spark Exception when converting a MySQL table to parquet
问题描述
我正在尝试使用spark 1.6.2将MySQL远程表转换为镶木地板文件.
I'm trying to convert a MySQL remote table to a parquet file using spark 1.6.2.
此过程将运行10分钟,填满内存,然后从以下消息开始:
The process runs for 10 minutes, filling up memory, than starts with these messages:
WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
最后由于以下错误而失败:
at the end fails with this error:
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded
我正在使用以下命令在spark-shell中运行它:
I'm running it in a spark-shell with these commands:
spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G
val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()
dataframe_mysql.saveAsParquetFile("name.parquet")
我将执行程序的最大内存限制为12G.有没有一种方法可以强制以小"块形式编写镶木地板文件以释放内存?
I have limits to the max executor memory to 12G. Is there a way to force writing the parquet file in "small" chunks freeing memory?
推荐答案
问题似乎是使用jdbc连接器读取数据时您没有定义分区.
It seemed like the problem was that you had no partition defined when you read your data with the jdbc connector.
默认情况下,不分发从JDBC进行读取,因此要启用分发,您必须设置手动分区.您需要一列是一个很好的分区键,并且必须先了解分布.
Reading from JDBC isn't distributed by default, so to enable distribution you have to set manual partitioning. You need a column which is a good partitioning key and you have to know distribution up front.
这显然是您的数据的样子:
This is what your data looks like apparently :
root
|-- id: long (nullable = false)
|-- order_year: string (nullable = false)
|-- order_number: string (nullable = false)
|-- row_number: integer (nullable = false)
|-- product_code: string (nullable = false)
|-- name: string (nullable = false)
|-- quantity: integer (nullable = false)
|-- price: double (nullable = false)
|-- price_vat: double (nullable = false)
|-- created_at: timestamp (nullable = true)
|-- updated_at: timestamp (nullable = true)
order_year
对我来说似乎是一个不错的候选人. (根据您的评论,您似乎有约20年的年龄)
order_year
seemed like a good candidate to me. (you seem to have ~20 years according to your comments)
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = ???
val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???
// Manual partitioning
val partitionColumn: String = "order_year"
val options: Map[String, String] = Map("driver" -> driver,
"url" -> connectionUrl,
"dbtable" -> query,
"user" -> userName,
"password" -> password,
"partitionColumn" -> partitionColumn,
"lowerBound" -> "0",
"upperBound" -> "3000",
"numPartitions" -> "300"
)
val df = sqlContext.read.format("jdbc").options(options).load()
PS::partitionColumn
,lowerBound
,upperBound
,numPartitions
:
如果指定了这些选项,则必须全部指定.
PS: partitionColumn
, lowerBound
, upperBound
, numPartitions
:
These options must all be specified if any of them is specified.
现在您可以将您的DataFrame
保存到镶木地板中.
Now you can save your DataFrame
to parquet.
这篇关于将MySQL表转换为镶木地板时出现火花异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!