Airflow 2.0.2 如何使用 xcom 在 postgres 任务中传递参数? [英] Airflow 2.0.2 How to pass parameter within postgres tasks using xcom?

查看:21
本文介绍了Airflow 2.0.2 如何使用 xcom 在 postgres 任务中传递参数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以动态方式在 postgres 运算符中传递参数.

I am trying to pass the params in postgres operator, in a dynamic way.

刷新元数据有两个任务,

There are two tasks in order to refresh the metadata,

  1. 获取id列表(get_query_id_task)

  1. get list of id (get_query_id_task)

传递 id 列表以获取并执行查询 (get_query_text_task)

pass the list of ids to get and execute the query ( get_query_text_task)

get_query_id_task = PythonOperator(
     task_id='get_query_id',
     python_callable=query_and_push,
     #provide_context=True,
     op_kwargs={
         'sql' : read_sql('warmupqueryid.sql')
                         }
                     )


 get_query_text_task= PostgresOperator(
     task_id='get_query_text',
     postgres_conn_id='redshift',
     trigger_rule=TriggerRule.ALL_DONE,
     params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
     sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
            FROM stl_querytext
            WHERE query in {{ macros.custom_macros.render_list_sql(params.query_ids) }};""",
     )

Xcom push 返回查询列表如下:

Xcom push return the list of queries as below:

[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]

我使用插件来渲染 xcom 推送:

I have used plugin to render the xcom push:

def render_list_sql(li):
l = []
for index, tup in enumerate(li):  
    idd = tup[0]
    l.append(idd)
return tuple(l)    
    

# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "custom_macros"
    macros = [render_list_sql]

第一个任务的模板渲染:

Template render from the first task:

模板渲染不传递参数

Xcom 推送值,是元组列表

Xcom push value, is list of tuples

使用下面提供的解决方案已经解决了问题.但是我无法对 id 列表进行循环.所以它只是向我展示了一个 ID.我不知道如何循环 ID.

The problem has been solved using the provided solution in below.However I couldn't make a loop over the list of ids. So it's just showing me one id. and I am not sure how to do loop over the ids.

这是日志:

*** Reading remote log from s3://ob-airflow-pre/logs/Redshift_warm-up/get_query_text/2021-05-27T20:31:05.036338+00:00/1.log.
[2021-05-27 20:31:07,435] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,463] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-27 20:31:07,463] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,473] {taskinstance.py:1089} INFO - Executing <Task(PostgresOperator): get_query_text> on 2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,476] {standard_task_runner.py:52} INFO - Started process 384 to run task
[2021-05-27 20:31:07,479] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'Redshift_warm-up', 'get_query_text', '2021-05-27T20:31:05.036338+00:00', '--job-id', '2045', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/redshift_warm-up_dag.py', '--cfg-path', '/tmp/tmp8n32exly', '--error-file', '/tmp/tmp0bdhn3lj']
[2021-05-27 20:31:07,479] {standard_task_runner.py:77} INFO - Job 2045: Subtask get_query_text
[2021-05-27 20:31:07,645] {logging_mixin.py:104} INFO - Running <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-27 20:31:07,747] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=Redshift_warm-up
AIRFLOW_CTX_TASK_ID=get_query_text
AIRFLOW_CTX_EXECUTION_DATE=2021-05-27T20:31:05.036338+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,748] {postgres.py:69} INFO - Executing: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);
[2021-05-27 20:31:07,767] {base.py:69} INFO - Using connection to: id: redshift. Host: sys-redshift-pre.oneboxtickets.net, Port: 5439, Schema: reports, Login: mstr_new, Password: XXXXXXXX, extra: XXXXXXXX
[2021-05-27 20:31:07,792] {dbapi.py:180} INFO - Running statement: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);, parameters: None
[2021-05-27 20:31:08,727] {dbapi.py:186} INFO - Rows affected: 1
[2021-05-27 20:31:08,759] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=Redshift_warm-up, task_id=get_query_text, execution_date=20210527T203105, start_date=20210527T203107, end_date=20210527T203108
[2021-05-27 20:31:08,806] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-27 20:31:08,814] {local_task_job.py:146} INFO - Task exited with return code 0

推荐答案

params 参数不是模板化",因此它只会呈现字符串.所以把你的 param 直接移到 SQL

params argument is not "Templated", so it would only render strings. So move your param directly to SQL

    get_query_text_task= PostgresOperator(
        task_id='get_query_text',
        postgres_conn_id='redshift',
        trigger_rule=TriggerRule.ALL_DONE,
        sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
        )

这篇关于Airflow 2.0.2 如何使用 xcom 在 postgres 任务中传递参数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆