如何在雪花数据库中更有效地插入json响应数据? [英] How to insert json response data in snowflake database more efficiently?
问题描述
我目前正在遍历json响应,并逐行插入每一行.
即使插入几千行数据,这也非常慢.
插入数据最有效的方法是什么?
这是我的代码.
从模块导入usr,pwd,acct,db,schem,api_key中的 导入雪花.连接器导入日期时间end_point ='用户'def雪花_connect():全局游标,mydbmydb =雪花.connector.connect(用户= usr,密码=密码,帐户=帐户,数据库= db,模式=方案,)def雪花_插入(ID,激活,名称):全局游标Snow_Connect()游标= mydb.cursor()sql_insert_query ="插入用户(ID,已激活,名称)值(%s,%s,%s)""insert_tuple =(id,已激活,名称)cursor.execute(sql_insert_query,insert_tuple)返回游标def get_users():url ='https://company.pipedrive.com/v1/{}?&api_token={}'.format(end_point,api_key)响应= request.request("GET",url).json()read_users(响应)def read_users(响应):用于响应中的数据['data']:id =数据['id']已激活=数据['已激活']名称=数据['名称']rainbow_insert(标识,已激活,名称)如果__name__ =="__main__":snow_truncate()get_users()cursor.close()
INSERT
语句都可以发送多个值,使用多值功能,或使用避免可能的SQL注入.修改后的代码版本:
从模块导入usr,pwd,acct,db,schem,api_key中的 导入雪花.连接器导入日期时间end_point ='用户'MYDB =无def雪花_connect():如果MYDB为None:MYDB =雪花.connector.connect(用户= usr,密码=密码,帐户=帐户,数据库= db,模式=方案,)def雪花_插入_全部(行):Snow_Connect()游标= MYDB.cursor()sql_insert_query =将用户插入(ID,激活,名称)值(?,?,?)cursor.executemany(sql_insert_query,行)返回游标def get_users():url ='https://company.pipedrive.com/v1/{}?&api_token={}'.format(end_point,api_key)响应= request.request("GET",url).json()read_users(响应)def read_users(响应):#all_data = [(data ['id'],data ['activated'],data ['name'])for data in response ['data']]snow_insert_all(all_data)如果__name__ =="__main__":snow_truncate()get_users()如果MYDB不是None:MYDB.close()
注意:在这里,我只专注于改进Snowflake和DB-API交互部分,但总的来说还有其他错误(变量和方法命名,不必要使用全局变量,资源处理等).)的脚本编写方式,如果您想进一步改善程序,可以使用代码审查的帮助.>
I am currently looping through a json response and inserting each row one by one.
This is very slow even for a few thousand rows of data insert.
What is the most efficient way to insert the data?
Here is my code.
from module import usr, pwd, acct, db, schem, api_key
import snowflake.connector
import datetime
end_point = 'users'
def snowflake_connect():
global cursor, mydb
mydb = snowflake.connector.connect(
user=usr,
password=pwd,
account=acct,
database=db,
schema=schem,
)
def snowflake_insert(id, activated, name):
global cursor
snowflake_connect()
cursor = mydb.cursor()
sql_insert_query = """ INSERT INTO USERS(ID, ACTIVATED, NAME) VALUES (%s, %s, %s)"""
insert_tuple = (id, activated, name)
cursor.execute(sql_insert_query, insert_tuple)
return cursor
def get_users():
url = 'https://company.pipedrive.com/v1/{}?&api_token={}'.format(end_point,api_key)
response = requests.request("GET", url).json()
read_users(response)
def read_users(response):
for data in response['data']:
id = data['id']
activated = data['activated']
name = data['name']
snowflake_insert(id, activated, name)
if __name__ == "__main__":
snowflake_truncate()
get_users()
cursor.close()
As noted by others in comments, to get the highest efficiency, especially for a continual load, load your formatted data files directly into Snowflake instead of using INSERT
statements as a best-practice.
However, the code in description can also be further improved to minimise overheads created per inserted row. A few key observations:
- The code is creating a new
connection
object every insert, this is unnecessary. - Since the intention is to only run
INSERT
statements that do not need isolation, thecursor
object can be reused. - Multiple values can be sent per
INSERT
statement, using the multi-value feature, or expressed in a simpler way usingcursor.executemany(…)
. - Switch to using
qmark (?)
parameter formatting to avoid potential SQL injection.
A modified code version:
from module import usr, pwd, acct, db, schem, api_key
import snowflake.connector
import datetime
end_point = 'users'
MYDB = None
def snowflake_connect():
if MYDB is None:
MYDB = snowflake.connector.connect(
user=usr,
password=pwd,
account=acct,
database=db,
schema=schem,
)
def snowflake_insert_all(rows):
snowflake_connect()
cursor = MYDB.cursor()
sql_insert_query = "INSERT INTO USERS(ID, ACTIVATED, NAME) VALUES (?, ?, ?)"
cursor.executemany(sql_insert_query, rows)
return cursor
def get_users():
url = 'https://company.pipedrive.com/v1/{}?&api_token={}'.format(end_point,api_key)
response = requests.request("GET", url).json()
read_users(response)
def read_users(response):
#
all_data = [(data['id'], data['activated'], data['name']) for data in response['data']]
snowflake_insert_all(all_data)
if __name__ == "__main__":
snowflake_truncate()
get_users()
if MYDB is not None:
MYDB.close()
Note: I've only focussed on improving the Snowflake and DB-API interaction portions here but in general there's other faults (variable and method naming, unnecessary use of globals, resource handling, etc.) in the way this script is written that can use help from Code Review, if you're looking to improve your program further.
这篇关于如何在雪花数据库中更有效地插入json响应数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!