如何使用Airflow将sql输出存储到pandas数据框? [英] How to do store sql output to pandas dataframe using Airflow?

查看:141
本文介绍了如何使用Airflow将sql输出存储到pandas数据框?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将数据从SQL存储到Pandas数据帧并进行一些数据转换,然后使用气流将其加载到另一个表中



我面临的问题是连接字符串只能通过Airflow访问表。因此,我需要使用气流作为媒介来读写数据。



这怎么办?



我的代码

  Task1 = PostgresOperator(
task_id ='Task1',
postgres_conn_id ='REDSHIFT_CONN',
sql = SELECT * FROM Western.trip limit 5,
params = {'limit':'50'},
dag = dag

任务的输出需要存储到数据帧(df)中,并在转换后再加载回另一个表中。 / p>

这怎么办?

解决方案

我怀疑其中有一个内置的运算符。您可以轻松编写自定义运算符




  • 扩展 PostgresOperator 或只是 BaseOperator /您选择的任何其他运算符所有自定义代码都会放入覆盖的 execute() 方法

  • 然后使用 PostgresHook 通过调用Pandas DataFrame /blob/master/airflow/hooks/dbapi_hook.py#L110 rel = nofollow noreferrer> get_pandas_df() 函数

  • 执行熊猫 df

  • 最后使用 insert_rows( ) 函数将数据插入表中






UPDATE-1



根据要求,我在此为操作员添加代码

 来自输入imp ort Dict,Any,List,tuple 

从airflow.hooks.postgres_hook导入PostgresHook
从airflow.operators.postgres_operator导入PostgresOperator
从airflow.utils.decorators导入apply_defaults $ b来自熊猫的$ b导入DataFrame


类MyCustomOperator(PostgresOperator):

@apply_defaults
def __init __(self,destination_table:str,* args, ** kwargs):
super().__ init __(* args,** kwargs)
self.destination_table:str = destination_table

def execute(self,context:Dict [ str,Any]):
#创建PostgresHook
self.hook:PostgresHook = PostgresHook(postgres_conn_id = self.postgres_conn_id,
schema = self.database)
#从Postgres读取数据-SQL查询到熊猫DataFrame
df:DataFrame = self.hook.get_pandas_df(sql = self.sql,parameters = self.parameters)
#在此处$ d $上执行df转换df ['column_to_be_doubled'] = df ['column_to_be_doubled']。multiply(2)
..
#将pandas DataFrame转换为元组列表
行:List [Tuple [Any,.. 。]] = list(df.itertuples(index = False,name = None))
#在目标Postgres表中插入元组列表
self.hook.insert_rows(table = self.destination_table,rows =行)

注意:该代码段仅供参考;



参考





进一步的修改/改进




  • destination_table 参数可以从 变量

  • 如果目标表不一定位于相同的 Postgres 模式中,那么我们可以采用另一个参数,例如 destination_postgres_conn_id __ init __ 中,并使用它创建一个 destination_hook ,我们可以在其上调用 insert_rows 方法


I want to store data from SQL to Pandas dataframe and do some data transformations and then load to another table suing airflow

Issue that I am facing is that connection string to tables are accessbale only through Airflow. So I need to use airflow as medium to read and write data.

How can this be done ?

MY code

Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag

The output of task needs to be stored to dataframe (df) and after tranfromations and load back into another table.

How can this be done?

解决方案

I doubt there's an in-built operator for this. You can easily write a custom operator

  • Extend PostgresOperator or just BaseOperator / any other operator of your choice. All custom code goes into the overridden execute() method
  • Then use PostgresHook to obtain a Pandas DataFrame by invoking get_pandas_df() function
  • Perform whatever transformations you have to do in your pandas df
  • Finally use insert_rows() function to insert data into table

UPDATE-1

As requested, I'm hereby adding the code for operator

from typing import Dict, Any, List, Tuple

from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame


class MyCustomOperator(PostgresOperator):

    @apply_defaults
    def __init__(self, destination_table: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.destination_table: str = destination_table

    def execute(self, context: Dict[str, Any]):
        # create PostgresHook
        self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                               schema=self.database)
        # read data from Postgres-SQL query into pandas DataFrame
        df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        # perform transformations on df here
        df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
        ..
        # convert pandas DataFrame into list of tuples
        rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
        # insert list of tuples in destination Postgres table
        self.hook.insert_rows(table=self.destination_table, rows=rows)

Note: The snippet is for reference only; it has NOT been tested

References

Further modifications / improvements

  • The destination_table param can be read from Variable
  • If the destination table doesn't necessarily reside in same Postgres schema, then we can take another param like destination_postgres_conn_id in __init__ and use that to create a destination_hook on which we can invoke insert_rows method

这篇关于如何使用Airflow将sql输出存储到pandas数据框?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆