对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数? [英] For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?
问题描述
我使用Airflow来管理ETL任务的执行和计划。已创建DAG,并且工作正常。但是可以通过cli手动触发dag时传递参数。
I use Airflow to manage ETL tasks execution and schedule. A DAG has been created and it works fine. But is it possible to pass parameters when manually trigger the dag via cli.
例如:
我的DAG每天在01:30运行,并处理数据昨天(时间范围从昨天01:30到今天01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。
For example: My DAG runs every day at 01:30, and processes data for yesterday(time range from 01:30 yesterday to 01:30 today). There might be some issues with the data source. I need to re-process those data (manually specify the time range).
所以我可以创建一个气流DAG,并在计划时将默认时间范围设置为从昨天的01:30到今天的01:30。然后,如果数据源出了点问题,我需要手动触发DAG并手动将时间范围作为参数传递。
So can I create such an airflow DAG, when it's scheduled, that the default time range is from 01:30 yesterday to 01:30 today. Then if anything wrong with the data source, I need to manually trigger the DAG and manually pass the time range as parameters.
据我所知 airflow测试
具有 -tp
可以将参数传递给任务。但这仅用于测试特定任务。并且 airflow trigger_dag
没有 -tp
选项。那么有没有办法将tigger_dag传递给DAG,然后操作员可以读取这些参数?
As I know airflow test
has -tp
that can pass params to the task. But this is only for testing a specific task. and airflow trigger_dag
doesn't have -tp
option. So is there any way to tigger_dag and pass parameters to the DAG, and then the Operator can read these parameters?
谢谢!
推荐答案
您可以使用-conf'{ key: value}'
从CLI传递参数然后在DAG文件中将其用作模板字段中的 {{dag_run.conf [ key]}}
。
You can pass parameters from the CLI using --conf '{"key":"value"}'
and then use it in the DAG file as "{{ dag_run.conf["key"] }}"
in templated field.
CLI :
airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
DAG文件:
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_dag_conf',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag,
)
这篇关于对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!