通过 Spark SQL 进行批量数据迁移 [英] Bulk data migration through Spark SQL

查看:70
本文介绍了通过 Spark SQL 进行批量数据迁移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试通过 Spark SQL 将一个非常大的 MySQL 表的内容批量迁移到一个镶木地板文件中.但是这样做时,我很快就会耗尽内存,即使将驱动程序的内存限制设置得更高(我在本地模式下使用 spark).示例代码:

I'm currently trying to bulk migrate the contents of a very large MySQL table into a parquet file via Spark SQL. But when doing so I quickly run out of memory, even when setting the driver's memory limit higher (I'm using spark in local mode). Example code:

Dataset<Row> ds = spark.read()
    .format("jdbc")
    .option("url", url)
    .option("driver", "com.mysql.jdbc.Driver")
    .option("dbtable", "bigdatatable")
    .option("user", "root")
    .option("password", "foobar")
    .load();

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");

似乎 Spark 试图将整个表内容读入内存,这不会很好地工作.那么,通过 Spark SQL 进行批量数据迁移的最佳方法是什么?

It seems like Spark tries to read the entire table contents into memory, which isn't going to work out very well. So, what's the best approach to doing bulk data migration via Spark SQL?

推荐答案

在您的解决方案中,Spark 会在开始写入之前将整个表内容读入一个分区.您可以避免这种情况的一种方法是对读取部分进行分区,但它需要源数据中的数字顺序列:

In your solution, Spark will read entire table contents into one partition before it starts writing. One way you can avoid that is partitioning the reading part, but it requires a numeric sequential column in the source data:

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "bigdatatable")
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "1")
  .option("upperBound", "10000")
  .option("numPartitions", "64")
  .load();

在上面的例子中,NUMERIC_COL"列必须存在于数据中,理想情况下,它应该在 1 到 10000 之间统一变化.当然,这是很多要求,这样的列可能不存在,因此,您可能应该在数据库中创建一个具有类似列的视图,或者将其添加到查询中(请注意,我使用了通用 SQL 语法,您必须适应您的 DBMS):

In the example above, the column "NUMERIC_COL" must exist in the data and it should, ideally, vary uniformly from 1 to 10000. Of course, this is a lot of requirements and a column like that will probably not exist, so you should probably create a view in the database with a column like that, or you add it in the query (note that I used a generic SQL syntax, you will have to adapt for your DBMS):

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", query)
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "0")
  .option("upperBound", "63")
  .option("numPartitions", "64")
  .load();

这篇关于通过 Spark SQL 进行批量数据迁移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆