对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数? [英] For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?

查看:1279
本文介绍了对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Airflow来管理ETL任务的执行和计划。已创建DAG,并且工作正常。但是可以通过cli手动触发dag时传递参数。

I use Airflow to manage ETL tasks execution and schedule. A DAG has been created and it works fine. But is it possible to pass parameters when manually trigger the dag via cli.

例如:
我的DAG每天在01:30运行,并处理数据昨天(时间范围从昨天01:30到今天01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。

For example: My DAG runs every day at 01:30, and processes data for yesterday(time range from 01:30 yesterday to 01:30 today). There might be some issues with the data source. I need to re-process those data (manually specify the time range).

所以我可以创建一个气流DAG,并在计划时将默认时间范围设置为从昨天的01:30到今天的01:30。然后,如果数据源出了点问题,我需要手动触发DAG并手动将时间范围作为参数传递。

So can I create such an airflow DAG, when it's scheduled, that the default time range is from 01:30 yesterday to 01:30 today. Then if anything wrong with the data source, I need to manually trigger the DAG and manually pass the time range as parameters.

据我所知 airflow测试具有 -tp 可以将参数传递给任务。但这仅用于测试特定任务。并且 airflow trigger_dag 没有 -tp 选项。那么有没有办法将tigger_dag传递给DAG,然后操作员可以读取这些参数?

As I know airflow test has -tp that can pass params to the task. But this is only for testing a specific task. and airflow trigger_dag doesn't have -tp option. So is there any way to tigger_dag and pass parameters to the DAG, and then the Operator can read these parameters?

谢谢!

推荐答案

您可以使用-conf'{ key: value}'从CLI传递参数然后在DAG文件中将其用作模板字段中的 {{dag_run.conf [ key]}}

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.

CLI

airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'

DAG文件

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_dag_conf',
    default_args=args,
    schedule_interval=None,
)

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag,
)

这篇关于对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆