通过Spark SQL进行批量数据迁移 [英] Bulk data migration through Spark SQL

查看:428
本文介绍了通过Spark SQL进行批量数据迁移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正试图通过Spark SQL将一个非常大的MySQL表的内容批量迁移到一个Parquet文件中.但是这样做时,即使将驱动程序的内存限制设置得更高,我也会很快用完内存(我在本地模式下使用spark).示例代码:

I'm currently trying to bulk migrate the contents of a very large MySQL table into a parquet file via Spark SQL. But when doing so I quickly run out of memory, even when setting the driver's memory limit higher (I'm using spark in local mode). Example code:

Dataset<Row> ds = spark.read()
    .format("jdbc")
    .option("url", url)
    .option("driver", "com.mysql.jdbc.Driver")
    .option("dbtable", "bigdatatable")
    .option("user", "root")
    .option("password", "foobar")
    .load();

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");

似乎Spark尝试将整个表的内容读入内存,但效果不太好.那么,通过Spark SQL进行批量数据迁移的最佳方法是什么?

It seems like Spark tries to read the entire table contents into memory, which isn't going to work out very well. So, what's the best approach to doing bulk data migration via Spark SQL?

推荐答案

在您的解决方案中,Spark将在开始写入之前将整个表的内容读取到一个分区中.可以避免这种情况的一种方法是对阅读部分进行分区,但是它需要在源数据中使用数字顺序列:

In your solution, Spark will read entire table contents into one partition before it starts writing. One way you can avoid that is partitioning the reading part, but it requires a numeric sequential column in the source data:

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "bigdatatable")
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "1")
  .option("upperBound", "10000")
  .option("numPartitions", "64")
  .load();

在上面的示例中,列"NUMERIC_COL"必须存在于数据中,理想情况下,列号应在1到10000之间均匀变化.当然,这有很多要求,并且类似的列可能不会存在,因此,您可能应该在数据库中使用类似的列创建视图,或者将其添加到查询中(请注意,我使用了通用SQL语法,因此必须适应您的DBMS):

In the example above, the column "NUMERIC_COL" must exist in the data and it should, ideally, vary uniformly from 1 to 10000. Of course, this is a lot of requirements and a column like that will probably not exist, so you should probably create a view in the database with a column like that, or you add it in the query (note that I used a generic SQL syntax, you will have to adapt for your DBMS):

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", query)
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "0")
  .option("upperBound", "63")
  .option("numPartitions", "64")
  .load();

这篇关于通过Spark SQL进行批量数据迁移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆