将MySQL表转换为镶木地板时出现火花异常 [英] Spark Exception when converting a MySQL table to parquet

查看:95
本文介绍了将MySQL表转换为镶木地板时出现火花异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用spark 1.6.2将MySQL远程表转换为镶木地板文件.

I'm trying to convert a MySQL remote table to a parquet file using spark 1.6.2.

此过程将运行10分钟,填满内存,然后从以下消息开始:

The process runs for 10 minutes, filling up memory, than starts with these messages:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

最后由于以下错误而失败:

at the end fails with this error:

ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded

我正在使用以下命令在spark-shell中运行它:

I'm running it in a spark-shell with these commands:

spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()

dataframe_mysql.saveAsParquetFile("name.parquet")

我将执行程序的最大内存限制为12G.有没有一种方法可以强制以小"块形式编写镶木地板文件以释放内存?

I have limits to the max executor memory to 12G. Is there a way to force writing the parquet file in "small" chunks freeing memory?

推荐答案

问题似乎是使用jdbc连接器读取数据时您没有定义分区.

It seemed like the problem was that you had no partition defined when you read your data with the jdbc connector.

默认情况下,不分发从JDBC进行读取,因此要启用分发,您必须设置手动分区.您需要一列是一个很好的分区键,并且必须先了解分布.

Reading from JDBC isn't distributed by default, so to enable distribution you have to set manual partitioning. You need a column which is a good partitioning key and you have to know distribution up front.

这显然是您的数据的样子:

This is what your data looks like apparently :

root 
|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true)

order_year对我来说似乎是一个不错的候选人. (根据您的评论,您似乎有约20年的年龄)

order_year seemed like a good candidate to me. (you seem to have ~20 years according to your comments)

import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = ???

val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???

// Manual partitioning
val partitionColumn: String = "order_year"

val options: Map[String, String] = Map("driver" -> driver,
  "url" -> connectionUrl,
  "dbtable" -> query,
  "user" -> userName,
  "password" -> password,
  "partitionColumn" -> partitionColumn,
  "lowerBound" -> "0",
  "upperBound" -> "3000",
  "numPartitions" -> "300"
)

val df = sqlContext.read.format("jdbc").options(options).load()

PS::partitionColumnlowerBoundupperBoundnumPartitions: 如果指定了这些选项,则必须全部指定.

PS: partitionColumn, lowerBound, upperBound, numPartitions: These options must all be specified if any of them is specified.

现在您可以将您的DataFrame保存到镶木地板中.

Now you can save your DataFrame to parquet.

这篇关于将MySQL表转换为镶木地板时出现火花异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆