如何使用Airflow将sql输出存储到pandas数据框? [英] How to do store sql output to pandas dataframe using Airflow?
问题描述
我想将数据从SQL存储到Pandas数据帧并进行一些数据转换,然后使用气流将其加载到另一个表中
我面临的问题是连接字符串只能通过Airflow访问表。因此,我需要使用气流作为媒介来读写数据。
这怎么办?
我的代码
Task1 = PostgresOperator(
task_id ='Task1',
postgres_conn_id ='REDSHIFT_CONN',
sql = SELECT * FROM Western.trip limit 5,
params = {'limit':'50'},
dag = dag
任务的输出需要存储到数据帧(df)中,并在转换后再加载回另一个表中。 / p>
这怎么办?
我怀疑其中有一个内置的运算符。您可以轻松编写自定义运算符
- 扩展
PostgresOperator
或只是BaseOperator
/您选择的任何其他运算符所有自定义代码都会放入覆盖的execute()
方法 - 然后使用
PostgresHook
通过调用Pandas DataFrame
/blob/master/airflow/hooks/dbapi_hook.py#L110 rel = nofollow noreferrer>get_pandas_df()
函数 - 执行
熊猫
df
- 最后使用
insert_rows( )
函数将数据插入表中
UPDATE-1
根据要求,我在此为操作员添加代码
来自输入imp ort Dict,Any,List,tuple
从airflow.hooks.postgres_hook导入PostgresHook
从airflow.operators.postgres_operator导入PostgresOperator
从airflow.utils.decorators导入apply_defaults $ b来自熊猫的$ b导入DataFrame
类MyCustomOperator(PostgresOperator):
@apply_defaults
def __init __(self,destination_table:str,* args, ** kwargs):
super().__ init __(* args,** kwargs)
self.destination_table:str = destination_table
def execute(self,context:Dict [ str,Any]):
#创建PostgresHook
self.hook:PostgresHook = PostgresHook(postgres_conn_id = self.postgres_conn_id,
schema = self.database)
#从Postgres读取数据-SQL查询到熊猫DataFrame
df:DataFrame = self.hook.get_pandas_df(sql = self.sql,parameters = self.parameters)
#在此处$ d $上执行df转换df ['column_to_be_doubled'] = df ['column_to_be_doubled']。multiply(2)
..
#将pandas DataFrame转换为元组列表
行:List [Tuple [Any,.. 。]] = list(df.itertuples(index = False,name = None))
#在目标Postgres表中插入元组列表
self.hook.insert_rows(table = self.destination_table,rows =行)
注意:该代码段仅供参考;
参考
进一步的修改/改进
-
destination_table
参数可以从变量
- 如果目标表不一定位于相同的
Postgres
模式中,那么我们可以采用另一个参数,例如destination_postgres_conn_id
在__ init __
中,并使用它创建一个destination_hook
,我们可以在其上调用insert_rows
方法
I want to store data from SQL to Pandas dataframe and do some data transformations and then load to another table suing airflow
Issue that I am facing is that connection string to tables are accessbale only through Airflow. So I need to use airflow as medium to read and write data.
How can this be done ?
MY code
Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM Western.trip limit 5 ",
params={'limit': '50'},
dag=dag
The output of task needs to be stored to dataframe (df) and after tranfromations and load back into another table.
How can this be done?
I doubt there's an in-built operator for this. You can easily write a custom operator
- Extend
PostgresOperator
or justBaseOperator
/ any other operator of your choice. All custom code goes into the overriddenexecute()
method - Then use
PostgresHook
to obtain aPandas
DataFrame
by invokingget_pandas_df()
function - Perform whatever transformations you have to do in your
pandas
df
- Finally use
insert_rows()
function to insert data into table
UPDATE-1
As requested, I'm hereby adding the code for operator
from typing import Dict, Any, List, Tuple
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame
class MyCustomOperator(PostgresOperator):
@apply_defaults
def __init__(self, destination_table: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.destination_table: str = destination_table
def execute(self, context: Dict[str, Any]):
# create PostgresHook
self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
schema=self.database)
# read data from Postgres-SQL query into pandas DataFrame
df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
# perform transformations on df here
df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
..
# convert pandas DataFrame into list of tuples
rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
# insert list of tuples in destination Postgres table
self.hook.insert_rows(table=self.destination_table, rows=rows)
Note: The snippet is for reference only; it has NOT been tested
References
Further modifications / improvements
- The
destination_table
param can be read fromVariable
- If the destination table doesn't necessarily reside in same
Postgres
schema, then we can take another param likedestination_postgres_conn_id
in__init__
and use that to create adestination_hook
on which we can invokeinsert_rows
method
这篇关于如何使用Airflow将sql输出存储到pandas数据框?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!