如何在pyspark中使用azure-sqldb-spark连接器 [英] How to use azure-sqldb-spark connector in pyspark

查看:201
本文介绍了如何在pyspark中使用azure-sqldb-spark连接器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想每天使用PySpark将大约10 GB的数据写入Azure SQL服务器数据库.当前使用的是JDBC驱动程序,它需要花费数小时才能一步一步地插入插入语句.

I want to write around 10 GB of data everyday to Azure SQL server DB using PySpark.Currently using JDBC driver which takes hours making insert statements one by one.

我正计划使用azure-sqldb-spark连接器,该连接器声称可以使用批量插入来加快写入速度.

I am planning to use azure-sqldb-spark connector which claims to turbo boost the write using bulk insert.

我查看了官方文档: https://github.com/Azure/azure-sqldb -火花. 该库是用scala编写的,基本上需要使用2个scala类:

I went through the official doc: https://github.com/Azure/azure-sqldb-spark. The library is written in scala and basically requires the use of 2 scala classes :

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "databaseName"      -> "MyDatabase",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig)

是否可以在pyspark中这样实现(使用sc._jvm):

Can it be implemented in used in pyspark like this (using sc._jvm):

Config = sc._jvm.com.microsoft.azure.sqldb.spark.config.Config
connect= sc._jvm.com.microsoft.azure.sqldb.spark.connect._

//all config

df.connect.bulkCopyToSqlDB(bulkCopyConfig)

我不是Python专家.有人可以帮我完成整个代码片段吗?

I am not an expert in Python. Can anybody help me with the complete snippet to get this done.

推荐答案

当前(截至2019年3月)的Spark连接器仅支持Scala API(如

The Spark connector currently (as of march 2019) only supports the Scala API (as documented here). So if you are working in a notebook, you could do all the preprocessing in python, finally register the dataframe as a temp table, e. g. :

df.createOrReplaceTempView('testbulk')

,并且必须在Scala中做最后一步:

and have to do the final step in Scala:

%scala
//configs...
spark.table("testbulk").bulkCopyToSqlDB(bulkCopyConfig)

这篇关于如何在pyspark中使用azure-sqldb-spark连接器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆