气流中的execution_date:需要作为变量访问 [英] execution_date in airflow: need to access as a variable
问题描述
我真的是这个论坛的新手。但是一段时间以来,我一直在为公司服务。抱歉,这个问题听起来真是愚蠢。
I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.
我正在使用BashOperators一堆编写管道。
基本上,对于每个任务,我只想使用'curl'调用REST api
I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'
这就是我的管道的样子(非常简化的版本):
This is what my pipeline looks like(very simplified version):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
如果您注意到我在做 current_datetime = datetime_obj.now(tz = tz.tzlocal())
相反,我在这里想要的是 'execution_date' >
If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal())
Instead what I want here is 'execution_date'
如何直接使用'execution_date'并将其分配给python文件中的变量?
How do I use 'execution_date' directly and assign it to a variable in my python file?
我遇到了访问args的一般问题。
任何帮助都会得到真正的感谢。
I have having this general issue of accessing args. Any help will be genuinely appreciated.
谢谢
推荐答案
BashOperator
的 bash_command
参数是一个模板 >。您可以使用<$ c $作为 datetime
object 在任何模板中访问 execution_date
c> execution_date 变量。在模板中,可以使用任何 jinja2
方法对其进行操作。
The BashOperator
's bash_command
argument is a template. You can access execution_date
in any template as a datetime
object using the execution_date
variable. In the template, you can use any jinja2
methods to manipulate it.
使用以下内容作为 BashOperator
bash_command
string :
Using the following as your BashOperator
bash_command
string:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
如果只希望与执行日期等效的字符串, ds
将返回日期戳(YYYY-MM-DD), ds_nodash
返回不带破折号的内容(YYYYMMDD),等等。有关宏
的更多信息,请参见 Api文档。
If you just want the string equivalent of the execution date, ds
will return a datestamp (YYYY-MM-DD), ds_nodash
returns same without dashes (YYYYMMDD), etc. More on macros
is available in the Api Docs.
您的最终运算符会看起来像这样:
Your final operator would look like:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
这篇关于气流中的execution_date:需要作为变量访问的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!