如何从Databricks群集运行SQL语句 [英] How to run SQL statement from Databricks cluster

查看:268
本文介绍了如何从Databricks群集运行SQL语句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个处理各种表的Azure Databricks群集,然后作为最后一步,我将这些表推送到Azure SQL Server中以供其他进程使用.我在数据块中有一个看起来像这样的单元:

I have an Azure Databricks cluster that processes various tables and then as a final step I push these table into an Azure SQL Server to be used by some other processes. I have a cell in databricks that looks something like this:

def generate_connection():
  jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
  jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
  connectionProperties = {
    "user" : jdbcUsername,
    "password" : jdbcPassword,
    "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  }
  return connectionProperties

def generate_url():
  jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
  jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
  jdbcPort = 1433
  return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)


def persist_table(table, sql_table, mode):
  jdbcUrl = generate_url();
  connectionProperties = generate_connection()
  table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)

persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")

这按预期工作.我的问题是Orders表很大,每天只有一小部分行可以更改,所以我要做的就是将覆盖模式更改为追加模式,并将数据帧从整个表格到可能已更改的行.所有这些我都知道如何足够容易地完成,但是我想做的是针对Azure SQL数据库运行一个简单的SQL语句,以删除已经存在的行,以便将它们可能已更改的行插入回去.

This works as expected. The problem that I have is that the Orders table is very large and only a small fraction of the rows could possible change each day, so what I want to do is change the overwrite mode to the append mode and change the data frame from being the entire table to just the rows that could have changed. All of this I know how to do easily enough, but what I want to do is run a simple SQL statement against the Azure SQL database to remove the rows that are already going to be there, so that they possibly changed rows will be inserted back.

我要针对

Delete From Sales.Orders Where CreateDate >= '01/01/2019'

推荐答案

您需要使用pyodbc库.您可以连接并使用sql语句.

You need to use the pyodbc library. You can connect and use sql statements.

import pyodbc

conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
                       'SERVER=mydatabe.database.azure.net;'
                       'DATABASE=AdventureWorks;UID=jonnyFast;'
                       'PWD=MyPassword')

# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))

不幸的是,让它在数据块上运行有点麻烦.不久前我写了一篇博客文章,应该会有所帮助. https://datathirst. net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

Unfortunately to get it working on databricks is a bit of a pain. I wrote a blog post a while back which should help. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

这篇关于如何从Databricks群集运行SQL语句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆