使用Dask的NEW to_sql来提高效率(内存/速度),或者使用替代方法将数据从dask数据框中获取到SQL Server表中 [英] Using Dask's NEW to_sql for improved efficiency (memory/speed) or alternative to get data from dask dataframe into SQL Server Table

查看:38
本文介绍了使用Dask的NEW to_sql来提高效率(内存/速度),或者使用替代方法将数据从dask数据框中获取到SQL Server表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的最终目标是将SQL/Python与一个大数据无法处理的项目一起使用(至少在我的机器上).因此,我已经使用 dask 进行了操作:

My ultimate goal is to use SQL/Python together for a project with too much data for pandas to handle (at least on my machine). So, I have gone with dask to:

  1. 从多个来源(主要是SQL Server表/视图)中读取数据
  2. 将数据处理/合并到一个大型的daster数据框表中,该表包含约1000万行和52列,其中一些具有一些长而唯一的字符串
  3. 每天将其写回到SQL Server,以便我的PowerBI报告可以自动刷新数据.

对于#1和#2,它们花费约30秒的时间来执行以使用最少的内存(几个SQL查询需要约200行代码才能使用dask操作大型数据集).快速而有趣!!!

For #1 and #2, they take ~30 seconds combined to execute using minimal memory (several SQL queries ~200 lines of code manipulating a large dataset with dask). Fast and Fun!!!

但是,上面的#3是主要瓶颈.在(1.内存和2.速度(执行时间))方面,有多少种有效的方法可以用敏捷或其他替代方法来完成#3?查看更多背景知识,以及我尝试过的内容和得出的结论.

But, #3 above has been the main bottleneck. What are some efficient ways in terms of (1. Memory and 2. Speed (time to execute)) to accomplish #3 with dask or other alternatives? See some more background, as well as what I have tried and some conclusions I have come to.

对于上面的#1,#2和#3,由于内存有限/执行时间长,我发现这与熊猫无关,但是 dask 解决了#1和上面的#2具有鲜艳的色彩,但我仍在努力#3-以自动化的方式将数据返回到SQL表中,而没有将其发送到.csv,然后再导入到SQL Server中.我尝试使用 .compute()将dask数据框转换为pandas数据框,然后编写 to_sql ,但这种方式无法实现使用dask读取/数据模型和再次耗尽了内存/永远需要执行.

For #1, #2 and #3 above, this has been a task that I have found impossible to do with pandas due to memory limitations/long execution time, but dask solved #1 and #2 above with flying colors, but I was still struggling with #3 -- getting the data back into a SQL table in an automated way where I didn't send to a .csv and then import into SQL Server. I tried .compute() to transform the dask dataframe to a pandas dataframe and then write to_sql, but that kind of defeated the purpose of using dask to read/data model and again was running out of memory/taking forever to execute anyway.

因此,新计划是每天使用 to_csv 生成一个新的.csv,并使用查询将数据批量插入表中.我认为这仍然是可行的解决方案;但是,今天,我非常高兴,发现dask发布了新的 to_sql 函数( https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL),我对所有参数进行了修改,以找到执行时间最快的最有效组合(当您每天为报告编写大型数据集时,这很重要).这就是我发现的内容,类似于很多有关 pd.to_sql 的帖子,包括Leblanc的帖子:

So, the new plan was to use to_csv to generate a new .csv daily and use a query to bulk insert the data into a table. I think this is still a viable solution; but, today, I was VERY happy to find out that dask released a new to_sql function (https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql). Leveraging existing StackOverflow articles/blogs about this topic (e.g. from Francois Leblanc - https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html), I tinkered with all of the parameters to find the most efficient combination that had the fastest time to execute (which matters A LOT when you are writing large datasets every single day for Reporting). This is what I found, which is similar to a lot of posts about pd.to_sql including Leblanc's:

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)

使用以下非默认参数的任意组合 减速 我的 to_sql 的执行时间(再次同意)以及LeBlanc在博客中提到的内容):

Using any combination of the following non-default parameters slowed down the time-to-execute for my to_sql (once again in agreement with what LeBlanc mentioned in his blog):

  1. chunksize = 40 (根据2098 SQL Server参数限制,我最多可以传递52列以40表示),
  2. method ='multi'
  3. parallel = True )
  1. chunksize=40 (40 is the max I could pass for 52 columns per the the 2098 SQL Server parameter limit),
  2. method='multi',
  3. parallel=True)

注意:我意识到,除了(或代替)传递 chunksize = 40 外,我还可以遍历33个dask数据帧分区并处理每个块 to_sql 单独.这本来可以提高内存效率,也可以更快.一个分区耗时45秒到1分钟,而所有分区一次完成整个dask数据帧花费的时间超过1个小时.我会尝试遍历所有分区并发布更新(如果速度更快).一个小时似乎很长,但是当我尝试用大熊猫进行计算时却感到完全受阻,这花了整夜或内存不足,所以这是一个进步.老实说,我对此很满意,很可能现在要使用 pyinstaller 来构建.exe并每天运行.exe,这样就可以完全自动化并从那里运行,但是我想这对其他人会有所帮助,因为在过去的几周中,我一直在尝试各种解决方案.

Note: I realized that in addition to (or in replacement of) passing chunksize=40, I could have looped through my 33 dask dataframe partitions and processed each chunk to_sql individually. This would have been more memory efficient and may have also been quicker. One partition was taking 45 seconds to 1 minute, while doing the whole dask dataframe at once took > 1 hour for all partitions. I will try looping through all partitions and post an update if that was faster. An hour seems like a lot, but I felt completely blocked when trying to compute with pandas, which took all night or ran out of memory, so this is a STEP UP. Honestly, I'm happy enough with this an am probably going to build an .exe now with pyinstaller and have the .exe run daily, so that this is fully automated and go from there, but I thought this would be helpful for others, as I have struggled with various solutions over the past couple weeks.

推荐答案

我测试了通过遍历分区而不是一次遍历将数据帧写入分区中的SQL Server,并且完成所有操作的时间类似于一次编写所有内容一次.

I tested writing the dataframe to SQL Server in partitions by looping through them, versus all at once, and the time to complete everything was similar to writing everything all at once.

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
# From my question, I have replaced the commented out line of code with everything below that to see if there was a significant increase in speed. There was not. It was about the same as the cod in the question.
# ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
i = 0
for i in range(ddf.npartitions):
    partition = ddf.get_partition(i)
    if i == 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
    if i > 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='append', index=False)
    i += 1

这篇关于使用Dask的NEW to_sql来提高效率(内存/速度),或者使用替代方法将数据从dask数据框中获取到SQL Server表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆