如何将大型Oracle表的SUBSET加载到Dask数据框中? [英] How to load SUBSET of large Oracle table into Dask dataframe?

查看:61
本文介绍了如何将大型Oracle表的SUBSET加载到Dask数据框中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我尝试过的:

dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)

这给了我一个大物体"警告,建议使用client.scatter.问题在于,似乎client.scatter首先需要将数据加载到Pandas数据帧中,这就是为什么由于RAM限制我首先使用Dask的原因.

This gives me a 'large object' warning and recommends using client.scatter. Problem is that it appears that client.scatter requires data to be loaded into a Pandas dataframe first, which is why I'm using Dask in the first place because of RAM limitations.

Oracle表太大,无法使用Dask的read_sql_table进行读取,因为read_sql_table不会以任何方式过滤该表.

The Oracle table is too large to read using Dask's read_sql_table because read_sql_table does not filter the table in any way.

想法?达斯克不适用于我的用例?

Ideas? Dask not applicable to my use case?

编辑-以下每个答案以及研究方法之后,这是我尝试转换为使用sqlalchemy表达式的方法:

Edit - Per answer below and after researching how to do so, here is my attempt to convert to use sqlalchemy expression:

from sqlalchemy import create_engine, Table, Column, String, MetaData, select

sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')

metadata = MetaData(bind=sql_engine)

table_reference = Table('table', metadata, autoload=True, schema='schema')

s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)

import dask.dataframe as dd

dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')

dask_df.count()

黄昏系列结构:npartitions = 1 action_timestamp int64vendor_name ... dtype:int64 Dask名称:dataframe-count-agg,1996年的任务

Dask Series Structure: npartitions=1 action_timestamp int64 vendor_name ... dtype: int64 Dask Name: dataframe-count-agg, 1996 tasks

dask_df.count().compute()

DatabaseError:(cx_Oracle.DatabaseError)ORA-02391:已超出同时限制SESSIONS_PER_USER(此错误的背景位于: http://sqlalche.me/e/4xp6 )

为什么要连接到Oracle?

Why trying to connect to Oracle?

编辑#2-万一有帮助,我还进行了其他测试.我想证明sqlalchemy可以独立工作,所以我通过以下方式证明了这一点:

Edit #2 - Just in case helpful, I have performed additional tests. I wanted to prove that sqlalchemy worked on its own so I proved that via:

result = sql_engine.execute(s)

type(result)

sqlalchemy.engine.result.ResultProxy

sqlalchemy.engine.result.ResultProxy

result.fetchone()

显示了结果

这似乎排除了SQLAlchemy/Oracle问题,那么接下来有什么想法可以尝试?

This seems to rule out SQLAlchemy/Oracle issues, so any ideas what to try next?

推荐答案

我现在正在寻找相同的东西.

I'm looking for the same thing right now.

为了不被卡住...您可能没有足够的RAM,但可能有很多可用存储空间.所以...目前的建议

To not being stuck... You may not have enough RAM but you possible have a lot of free storage. So... a suggestion for now

# imports
import pandas as pd 
import cx_Oracle as cx
import dask.dataframe as dd

# Connection stuff
...
conn = ...

# Query
qry = "SELECT * FROM HUGE_TABLE"

# Pandas Chunks
for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
    pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet 

# Dask dataframe reading from files (chunks)
dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet

由于这是IO密集型操作,并且您要执行顺序操作,因此可能需要一段时间.

Since this is IO intensive and you are performing sequential operations, it may take a while.

我建议更快地导出"是对表进行分区,并按每个分区并行执行块导出.

My suggestion to "export" quicker is to partition your table and perform the chunk export in parallel by each partition.

这篇关于如何将大型Oracle表的SUBSET加载到Dask数据框中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆