Spark:优化将DataFrame写入SQL Server [英] Spark: optimise writing a DataFrame to SQL Server

查看:448
本文介绍了Spark:优化将DataFrame写入SQL Server的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用下面的代码将43列和大约2,000,000行的DataFrame写入SQL Server中的表:

I am using the code below to write a DataFrame of 43 columns and about 2,000,000 rows into a table in SQL Server:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()

遗憾的是,虽然它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时.关于如何优化它的任何提示?

Sadly, while it does work for small DataFrames it's either extremely slow or gets timed out for large ones. Any hints on how to optimize it?

我尝试设置rewriteBatchedStatements=true

谢谢.

推荐答案

我们求助于使用 azure -sqldb-spark 库,而不是Spark的默认内置导出功能.该库为您提供了一个bulkCopyToSqlDB方法,该方法是 real 批处理插入的,并且执行速度更快 .它比内置功能实用性差一些,但以我的经验,还是值得的.

We resorted to using the azure-sqldb-spark library instead of the default built-in exporting functionality of Spark. This library gives you a bulkCopyToSqlDB method which is a real batch insert and goes a lot faster. It's a bit less practical to use than the built-in functionality, but in my experience it's still worth it.

我们或多或少像这样使用它:

We use it more or less like this:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._

val options = Map(
  "url"          -> "***",
  "databaseName" -> "***",
  "user"         -> "***",
  "password"     -> "***",
  "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)

// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)

val bulkConfig = Config(options ++ Map(
  "dbTable"           -> "myTable",
  "bulkCopyBatchSize" -> "20000",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkConfig)

如您所见,我们自己生成了CREATE TABLE查询.您可以 让该库创建表,但是它只能执行dataFrame.limit(0).write.sqlDB(config),效率仍然很低,可能需要您缓存DataFrame,并且不允许您选择SaveMode.

As you can see we generate the CREATE TABLE query ourselves. You can let the library create the table, but it will just do dataFrame.limit(0).write.sqlDB(config) which can still be pretty inefficient, probably requires you to cache your DataFrame, and it doesn't allow you to choose the SaveMode.

也可能很有趣:将这个库添加到我们的sbt版本中时,我们必须使用ExclusionRule,否则assembly任务将会失败.

Also potentially interesting: we had to use an ExclusionRule when adding this library to our sbt build, or the assembly task would fail.

libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
  ExclusionRule(organization = "org.apache.spark")
)

这篇关于Spark:优化将DataFrame写入SQL Server的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆